mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
test: add test to reproduce a lot of memory consumtion during migration (#3939)
This commit is contained in:
parent
ef814f6670
commit
866c82a3fa
1 changed files with 55 additions and 0 deletions
|
@ -1991,3 +1991,58 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
|
|||
await c_replica.execute_command("REPLICAOF NO ONE")
|
||||
capture = await seeder.capture()
|
||||
assert await seeder.compare(capture, replica.port)
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||
async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory):
|
||||
# Check data migration from one node to another
|
||||
instances = [
|
||||
df_factory.create(
|
||||
port=BASE_PORT + i,
|
||||
admin_port=BASE_PORT + i + 1000,
|
||||
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
replica = df_factory.create(
|
||||
port=BASE_PORT + 3,
|
||||
admin_port=BASE_PORT + 3 + 1000,
|
||||
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
|
||||
)
|
||||
|
||||
df_factory.start_all(instances)
|
||||
df_factory.start_all([replica])
|
||||
|
||||
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||
nodes[0].slots = [(0, 16383)]
|
||||
nodes[1].slots = []
|
||||
nodes[2].slots = []
|
||||
await replica.admin_client().execute_command(f"replicaof localhost {nodes[0].instance.port}")
|
||||
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||
|
||||
await nodes[0].client.execute_command("DEBUG POPULATE 2250000 test 1000 RAND SLOTS 0 16383")
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
nodes[0].migrations.append(
|
||||
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 8000)], nodes[1].id)
|
||||
)
|
||||
nodes[0].migrations.append(
|
||||
MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [(8001, 16383)], nodes[2].id)
|
||||
)
|
||||
|
||||
logging.debug("Start migration")
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||
|
||||
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", 1000)
|
||||
|
||||
nodes[0].migrations = []
|
||||
nodes[0].slots = []
|
||||
nodes[1].slots = [(0, 8000)]
|
||||
nodes[2].slots = [(8001, 16383)]
|
||||
logging.debug("remove finished migrations")
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||
|
||||
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||
|
|
Loading…
Reference in a new issue