mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: do not close connections at the end of pytest (#3811)
A common case is that we need to clean up a connection before we exit a test via .close() method. This is needed because otherwise the connection will raise a warning that it is left unclosed. However, remembering to call .close() at each connection at the end of the test is cumbersome! Luckily, fixtures in python can be marked as async which allow us to: * cache all clients created by DflyInstance.client() * clean them all at the end of the fixture in one go --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
ed11c8d3a4
commit
b19f722011
11 changed files with 31 additions and 183 deletions
|
@ -225,9 +225,6 @@ async def test_acl_cat_commands_multi_exec_squash(df_factory):
|
||||||
with pytest.raises(redis.exceptions.NoPermissionError):
|
with pytest.raises(redis.exceptions.NoPermissionError):
|
||||||
await client.execute_command(f"SET x{x} {x}")
|
await client.execute_command(f"SET x{x} {x}")
|
||||||
|
|
||||||
await admin_client.close()
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_acl_deluser(df_server):
|
async def test_acl_deluser(df_server):
|
||||||
|
@ -248,8 +245,6 @@ async def test_acl_deluser(df_server):
|
||||||
|
|
||||||
assert await client.execute_command("ACL WHOAMI") == "User is default"
|
assert await client.execute_command("ACL WHOAMI") == "User is default"
|
||||||
|
|
||||||
await close_clients(admin_client, client)
|
|
||||||
|
|
||||||
|
|
||||||
script = """
|
script = """
|
||||||
for i = 1, 100000 do
|
for i = 1, 100000 do
|
||||||
|
@ -279,8 +274,6 @@ async def test_acl_del_user_while_running_lua_script(df_server):
|
||||||
res = await admin_client.get(f"key{i}")
|
res = await admin_client.get(f"key{i}")
|
||||||
assert res == "100000"
|
assert res == "100000"
|
||||||
|
|
||||||
await admin_client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.skip("Non deterministic")
|
@pytest.mark.skip("Non deterministic")
|
||||||
|
@ -299,9 +292,6 @@ async def test_acl_with_long_running_script(df_server):
|
||||||
res = await admin_client.get(f"key{i}")
|
res = await admin_client.get(f"key{i}")
|
||||||
assert res == "100000"
|
assert res == "100000"
|
||||||
|
|
||||||
await client.close()
|
|
||||||
await admin_client.close()
|
|
||||||
|
|
||||||
|
|
||||||
def create_temp_file(content, tmp_dir):
|
def create_temp_file(content, tmp_dir):
|
||||||
file = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
|
file = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
|
||||||
|
@ -325,8 +315,6 @@ async def test_bad_acl_file(df_factory, tmp_dir):
|
||||||
with pytest.raises(redis.exceptions.ResponseError):
|
with pytest.raises(redis.exceptions.ResponseError):
|
||||||
await client.execute_command("ACL LOAD")
|
await client.execute_command("ACL LOAD")
|
||||||
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@dfly_args({"port": 1111})
|
@dfly_args({"port": 1111})
|
||||||
|
@ -381,8 +369,6 @@ async def test_good_acl_file(df_factory, tmp_dir):
|
||||||
assert "user vlad off ~foo ~bar* resetchannels -@all +@string" in result
|
assert "user vlad off ~foo ~bar* resetchannels -@all +@string" in result
|
||||||
assert "user default on nopass ~* &* +@all" in result
|
assert "user default on nopass ~* &* +@all" in result
|
||||||
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_acl_log(async_client):
|
async def test_acl_log(async_client):
|
||||||
|
@ -463,8 +449,6 @@ async def test_require_pass(df_factory):
|
||||||
res = await client.execute_command("GET foo")
|
res = await client.execute_command("GET foo")
|
||||||
assert res == "44"
|
assert res == "44"
|
||||||
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@dfly_args({"port": 1111, "requirepass": "temp"})
|
@dfly_args({"port": 1111, "requirepass": "temp"})
|
||||||
|
@ -480,7 +464,6 @@ async def test_require_pass_with_acl_file_order(df_factory, tmp_dir):
|
||||||
client = aioredis.Redis(username="default", password="jordan", port=df.port)
|
client = aioredis.Redis(username="default", password="jordan", port=df.port)
|
||||||
|
|
||||||
assert await client.set("foo", "bar")
|
assert await client.set("foo", "bar")
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -608,8 +591,6 @@ async def test_namespaces(df_server):
|
||||||
assert await roman.execute_command("AUTH roman roman_pass") == "OK"
|
assert await roman.execute_command("AUTH roman roman_pass") == "OK"
|
||||||
assert await roman.execute_command("GET foo") == None
|
assert await roman.execute_command("GET foo") == None
|
||||||
|
|
||||||
await close_clients(admin, adi, shahar, roman)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_default_user_bug(df_server):
|
async def test_default_user_bug(df_server):
|
||||||
|
@ -623,8 +604,6 @@ async def test_default_user_bug(df_server):
|
||||||
with pytest.raises(redis.exceptions.ResponseError):
|
with pytest.raises(redis.exceptions.ResponseError):
|
||||||
await client.execute_command("SET foo bar")
|
await client.execute_command("SET foo bar")
|
||||||
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_auth_resp3_bug(df_factory):
|
async def test_auth_resp3_bug(df_factory):
|
||||||
|
@ -642,8 +621,6 @@ async def test_auth_resp3_bug(df_factory):
|
||||||
assert res["role"] == "master"
|
assert res["role"] == "master"
|
||||||
assert res["id"] == 1
|
assert res["id"] == 1
|
||||||
|
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_acl_pub_sub_auth(df_factory):
|
async def test_acl_pub_sub_auth(df_factory):
|
||||||
|
|
|
@ -161,5 +161,4 @@ async def test_cluster_mgr(df_factory):
|
||||||
for i in range(NODES):
|
for i in range(NODES):
|
||||||
assert run_cluster_mgr(["--action=detach", f"--target_port={replicas[i].port}"])
|
assert run_cluster_mgr(["--action=detach", f"--target_port={replicas[i].port}"])
|
||||||
await check_cluster_data(client)
|
await check_cluster_data(client)
|
||||||
|
await client.close()
|
||||||
await close_clients(client, *replica_clients, c_master0)
|
|
||||||
|
|
|
@ -332,8 +332,6 @@ async def test_emulated_cluster_with_replicas(df_factory):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
await close_clients(c_master, *c_replicas)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"cluster_mode": "emulated"})
|
@dfly_args({"cluster_mode": "emulated"})
|
||||||
async def test_cluster_info(async_client):
|
async def test_cluster_info(async_client):
|
||||||
|
@ -391,8 +389,6 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory):
|
||||||
conn = node.client()
|
conn = node.client()
|
||||||
assert "inigo montoya" == await get_node_id(conn)
|
assert "inigo montoya" == await get_node_id(conn)
|
||||||
|
|
||||||
await close_clients(conn)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
|
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
|
||||||
|
@ -520,8 +516,6 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
|
||||||
assert (await c_nodes[0].get("KEY0")) == "value"
|
assert (await c_nodes[0].get("KEY0")) == "value"
|
||||||
assert await c_nodes[1].execute_command("DBSIZE") == 0
|
assert await c_nodes[1].execute_command("DBSIZE") == 0
|
||||||
|
|
||||||
await close_clients(*c_nodes, *c_nodes_admin)
|
|
||||||
|
|
||||||
|
|
||||||
# Tests that master commands to the replica are applied regardless of slot ownership
|
# Tests that master commands to the replica are applied regardless of slot ownership
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -631,8 +625,6 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
|
||||||
assert await c_master.execute_command("dbsize") == 0
|
assert await c_master.execute_command("dbsize") == 0
|
||||||
assert await c_replica.execute_command("dbsize") == 0
|
assert await c_replica.execute_command("dbsize") == 0
|
||||||
|
|
||||||
await close_clients(c_master, c_master_admin, c_replica, c_replica_admin)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
|
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
|
||||||
|
@ -740,8 +732,6 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF
|
||||||
assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size)
|
assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size)
|
||||||
assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size)
|
assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size)
|
||||||
|
|
||||||
await close_clients(c_master, c_master_admin, c_replica, c_replica_admin)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "admin_port": 30001})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "admin_port": 30001})
|
||||||
async def test_cluster_blocking_command(df_server):
|
async def test_cluster_blocking_command(df_server):
|
||||||
|
@ -786,8 +776,6 @@ async def test_cluster_blocking_command(df_server):
|
||||||
await v2
|
await v2
|
||||||
assert "MOVED" in str(e_info.value)
|
assert "MOVED" in str(e_info.value)
|
||||||
|
|
||||||
await close_clients(c_master, c_master_admin)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
|
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
|
||||||
|
@ -827,8 +815,6 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
|
||||||
await list_task
|
await list_task
|
||||||
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
|
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
|
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -847,7 +833,7 @@ async def test_cluster_native_client(
|
||||||
for i in range(3)
|
for i in range(3)
|
||||||
]
|
]
|
||||||
df_factory.start_all(masters)
|
df_factory.start_all(masters)
|
||||||
c_masters = [aioredis.Redis(port=master.port) for master in masters]
|
c_masters = [master.client() for master in masters]
|
||||||
c_masters_admin = [master.admin_client() for master in masters]
|
c_masters_admin = [master.admin_client() for master in masters]
|
||||||
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))
|
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))
|
||||||
|
|
||||||
|
@ -935,7 +921,7 @@ async def test_cluster_native_client(
|
||||||
seeder = df_seeder_factory.create(port=masters[0].port, cluster_mode=True)
|
seeder = df_seeder_factory.create(port=masters[0].port, cluster_mode=True)
|
||||||
await seeder.run(target_deviation=0.1)
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
client = aioredis.RedisCluster(decode_responses=True, host="localhost", port=masters[0].port)
|
client = masters[0].cluster_client()
|
||||||
|
|
||||||
assert await client.set("key0", "value") == True
|
assert await client.set("key0", "value") == True
|
||||||
assert await client.get("key0") == "value"
|
assert await client.get("key0") == "value"
|
||||||
|
@ -1025,7 +1011,6 @@ async def test_cluster_native_client(
|
||||||
await push_config(config, c_masters_admin + c_replicas_admin)
|
await push_config(config, c_masters_admin + c_replicas_admin)
|
||||||
|
|
||||||
await test_random_keys()
|
await test_random_keys()
|
||||||
await close_clients(client, *c_masters, *c_masters_admin, *c_replicas, *c_replicas_admin)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -1073,7 +1058,6 @@ async def test_config_consistency(df_factory: DflyInstanceFactory):
|
||||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
await check_for_no_state_status([node.admin_client for node in nodes])
|
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -1129,8 +1113,6 @@ async def test_cluster_flushall_during_migration(
|
||||||
|
|
||||||
assert await nodes[0].client.dbsize() == 0
|
assert await nodes[0].client.dbsize() == 0
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("interrupt", [False, True])
|
@pytest.mark.parametrize("interrupt", [False, True])
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -1180,7 +1162,6 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
|
||||||
assert 0 in slots[0] and 9000 in slots[0]
|
assert 0 in slots[0] and 9000 in slots[0]
|
||||||
assert 9001 in slots[1] and 16383 in slots[1]
|
assert 9001 in slots[1] and 16383 in slots[1]
|
||||||
|
|
||||||
await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes])
|
|
||||||
return
|
return
|
||||||
|
|
||||||
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
|
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
|
||||||
|
@ -1214,7 +1195,6 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
|
||||||
assert await nodes[1].client.execute_command("DBSIZE") == 19
|
assert await nodes[1].client.execute_command("DBSIZE") == 19
|
||||||
|
|
||||||
await check_for_no_state_status([node.admin_client for node in nodes])
|
await check_for_no_state_status([node.admin_client for node in nodes])
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
|
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
|
||||||
|
@ -1265,8 +1245,6 @@ async def test_migration_with_key_ttl(df_factory):
|
||||||
assert await nodes[1].client.execute_command("ttl k_without_ttl") == -1
|
assert await nodes[1].client.execute_command("ttl k_without_ttl") == -1
|
||||||
assert await nodes[1].client.execute_command("stick k_sticky") == 0
|
assert await nodes[1].client.execute_command("stick k_sticky") == 0
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_network_disconnect_during_migration(df_factory, df_seeder_factory):
|
async def test_network_disconnect_during_migration(df_factory, df_seeder_factory):
|
||||||
|
@ -1314,8 +1292,6 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
assert await seeder.compare(capture, nodes[1].instance.port)
|
assert await seeder.compare(capture, nodes[1].instance.port)
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"node_count, segments, keys",
|
"node_count, segments, keys",
|
||||||
|
@ -1366,9 +1342,7 @@ async def test_cluster_fuzzymigration(
|
||||||
|
|
||||||
# Start ten counters
|
# Start ten counters
|
||||||
counter_keys = [f"_counter{i}" for i in range(10)]
|
counter_keys = [f"_counter{i}" for i in range(10)]
|
||||||
counter_connections = [
|
counter_connections = [nodes[0].instance.cluster_client() for _ in range(10)]
|
||||||
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10)
|
|
||||||
]
|
|
||||||
counters = [
|
counters = [
|
||||||
asyncio.create_task(list_counter(key, conn))
|
asyncio.create_task(list_counter(key, conn))
|
||||||
for key, conn in zip(counter_keys, counter_connections)
|
for key, conn in zip(counter_keys, counter_connections)
|
||||||
|
@ -1454,7 +1428,7 @@ async def test_cluster_fuzzymigration(
|
||||||
await counter
|
await counter
|
||||||
|
|
||||||
# Check counter consistency
|
# Check counter consistency
|
||||||
cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)
|
cluster_client = nodes[0].instance.cluster_client()
|
||||||
for key in counter_keys:
|
for key in counter_keys:
|
||||||
counter_list = await cluster_client.lrange(key, 0, -1)
|
counter_list = await cluster_client.lrange(key, 0, -1)
|
||||||
for i, j in zip(counter_list, counter_list[1:]):
|
for i, j in zip(counter_list, counter_list[1:]):
|
||||||
|
@ -1464,9 +1438,6 @@ async def test_cluster_fuzzymigration(
|
||||||
assert await seeder.compare(capture, nodes[0].instance.port)
|
assert await seeder.compare(capture, nodes[0].instance.port)
|
||||||
|
|
||||||
await asyncio.gather(*[c.close() for c in counter_connections])
|
await asyncio.gather(*[c.close() for c in counter_connections])
|
||||||
await close_clients(
|
|
||||||
cluster_client, *[node.admin_client for node in nodes], *[node.client for node in nodes]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@ -1517,8 +1488,6 @@ async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
|
||||||
for i in range(SIZE):
|
for i in range(SIZE):
|
||||||
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
|
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
async def test_cluster_replication_migration(
|
async def test_cluster_replication_migration(
|
||||||
|
@ -1597,10 +1566,9 @@ async def test_cluster_replication_migration(
|
||||||
assert await seeder.compare(r2_capture, r1_node.instance.port)
|
assert await seeder.compare(r2_capture, r1_node.instance.port)
|
||||||
assert await seeder.compare(r1_capture, r2_node.instance.port)
|
assert await seeder.compare(r1_capture, r2_node.instance.port)
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
@pytest.mark.asyncio
|
||||||
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
|
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
|
||||||
"""Check data migration from one node to another."""
|
"""Check data migration from one node to another."""
|
||||||
instances = [
|
instances = [
|
||||||
|
@ -1660,8 +1628,6 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
|
||||||
for i in range(SIZE):
|
for i in range(SIZE):
|
||||||
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
|
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
|
||||||
|
|
||||||
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
|
|
||||||
def parse_lag(replication_info: str):
|
def parse_lag(replication_info: str):
|
||||||
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
|
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
|
||||||
|
@ -1748,8 +1714,6 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
assert await seeder.compare(capture, replica.port)
|
assert await seeder.compare(capture, replica.port)
|
||||||
|
|
||||||
await disconnect_clients(*c_nodes, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10):
|
async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
@ -1863,7 +1827,6 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
assert await seeder.compare(capture, replica.port)
|
assert await seeder.compare(capture, replica.port)
|
||||||
|
|
||||||
await disconnect_clients(*c_nodes, c_replica)
|
|
||||||
await proxy.close(proxy_task)
|
await proxy.close(proxy_task)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1940,8 +1903,6 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
assert await seeder.compare(capture, replica.port)
|
assert await seeder.compare(capture, replica.port)
|
||||||
|
|
||||||
await disconnect_clients(c_replica, *node_clients)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4})
|
@dfly_args({"proactor_threads": 4})
|
||||||
async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_seeder_factory):
|
async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_seeder_factory):
|
||||||
|
@ -2030,5 +1991,3 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
|
||||||
await c_replica.execute_command("REPLICAOF NO ONE")
|
await c_replica.execute_command("REPLICAOF NO ONE")
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
assert await seeder.compare(capture, replica.port)
|
assert await seeder.compare(capture, replica.port)
|
||||||
|
|
||||||
await disconnect_clients(c_replica, *node_clients)
|
|
||||||
|
|
|
@ -112,8 +112,8 @@ def parse_args(args: List[str]) -> Dict[str, Union[str, None]]:
|
||||||
return args_dict
|
return args_dict
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function", params=[{}])
|
@pytest_asyncio.fixture(scope="function", params=[{}])
|
||||||
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
async def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
||||||
"""
|
"""
|
||||||
Create an instance factory with supplied params.
|
Create an instance factory with supplied params.
|
||||||
"""
|
"""
|
||||||
|
@ -142,7 +142,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
||||||
|
|
||||||
factory = DflyInstanceFactory(params, args)
|
factory = DflyInstanceFactory(params, args)
|
||||||
yield factory
|
yield factory
|
||||||
factory.stop_all()
|
await factory.stop_all()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
|
@ -185,23 +185,6 @@ def connection(df_server: DflyInstance):
|
||||||
return redis.Connection(port=df_server.port)
|
return redis.Connection(port=df_server.port)
|
||||||
|
|
||||||
|
|
||||||
# @pytest.fixture(scope="class")
|
|
||||||
# def sync_pool(df_server: DflyInstance):
|
|
||||||
# pool = redis.ConnectionPool(decode_responses=True, port=df_server.port)
|
|
||||||
# yield pool
|
|
||||||
# pool.disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
# @pytest.fixture(scope="class")
|
|
||||||
# def client(sync_pool):
|
|
||||||
# """
|
|
||||||
# Return a client to the default instance with all entries flushed.
|
|
||||||
# """
|
|
||||||
# client = redis.Redis(connection_pool=sync_pool)
|
|
||||||
# client.flushall()
|
|
||||||
# return client
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def cluster_client(df_server):
|
def cluster_client(df_server):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -646,7 +646,6 @@ async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_factor
|
||||||
|
|
||||||
client = server.admin_client(password="XXX")
|
client = server.admin_client(password="XXX")
|
||||||
assert await client.dbsize() == 0
|
assert await client.dbsize() == 0
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_factory):
|
async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_factory):
|
||||||
|
@ -655,7 +654,6 @@ async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_fa
|
||||||
|
|
||||||
client = aioredis.Redis(port=server.port, **with_tls_client_args, ssl_cert_reqs=None)
|
client = aioredis.Redis(port=server.port, **with_tls_client_args, ssl_cert_reqs=None)
|
||||||
assert await client.dbsize() == 0
|
assert await client.dbsize() == 0
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, df_factory):
|
async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, df_factory):
|
||||||
|
@ -664,7 +662,6 @@ async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, d
|
||||||
|
|
||||||
client = aioredis.Redis(port=server.port, **with_ca_tls_client_args)
|
client = aioredis.Redis(port=server.port, **with_ca_tls_client_args)
|
||||||
assert await client.dbsize() == 0
|
assert await client.dbsize() == 0
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_tls_reject(
|
async def test_tls_reject(
|
||||||
|
@ -680,7 +677,6 @@ async def test_tls_reject(
|
||||||
client = server.client(**with_tls_client_args)
|
client = server.client(**with_tls_client_args)
|
||||||
with pytest.raises(redis_conn_error):
|
with pytest.raises(redis_conn_error):
|
||||||
await client.ping()
|
await client.ping()
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": "4", "pipeline_squash": 10})
|
@dfly_args({"proactor_threads": "4", "pipeline_squash": 10})
|
||||||
|
@ -890,7 +886,6 @@ async def test_tls_when_read_write_is_interleaved(
|
||||||
# This deadlocks
|
# This deadlocks
|
||||||
client = aioredis.Redis(port=server.port, **with_ca_tls_client_args)
|
client = aioredis.Redis(port=server.port, **with_ca_tls_client_args)
|
||||||
await client.execute_command("GET foo")
|
await client.execute_command("GET foo")
|
||||||
await client.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_lib_name_ver(async_client: aioredis.Redis):
|
async def test_lib_name_ver(async_client: aioredis.Redis):
|
||||||
|
|
|
@ -12,6 +12,7 @@ import psutil
|
||||||
import itertools
|
import itertools
|
||||||
from prometheus_client.parser import text_string_to_metric_families
|
from prometheus_client.parser import text_string_to_metric_families
|
||||||
from redis.asyncio import Redis as RedisClient
|
from redis.asyncio import Redis as RedisClient
|
||||||
|
from redis.asyncio import RedisCluster as RedisCluster
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
@ -95,6 +96,7 @@ class DflyInstance:
|
||||||
self.log_files: List[str] = []
|
self.log_files: List[str] = []
|
||||||
self.dynamic_port = False
|
self.dynamic_port = False
|
||||||
self.sed_proc = None
|
self.sed_proc = None
|
||||||
|
self.clients = []
|
||||||
|
|
||||||
if self.params.existing_port:
|
if self.params.existing_port:
|
||||||
self._port = self.params.existing_port
|
self._port = self.params.existing_port
|
||||||
|
@ -126,16 +128,31 @@ class DflyInstance:
|
||||||
|
|
||||||
def client(self, *args, **kwargs) -> RedisClient:
|
def client(self, *args, **kwargs) -> RedisClient:
|
||||||
host = "localhost" if self["bind"] is None else self["bind"]
|
host = "localhost" if self["bind"] is None else self["bind"]
|
||||||
return RedisClient(host=host, port=self.port, decode_responses=True, *args, **kwargs)
|
client = RedisClient(host=host, port=self.port, decode_responses=True, *args, **kwargs)
|
||||||
|
self.clients.append(client)
|
||||||
|
return client
|
||||||
|
|
||||||
def admin_client(self, *args, **kwargs) -> RedisClient:
|
def admin_client(self, *args, **kwargs) -> RedisClient:
|
||||||
return RedisClient(
|
client = RedisClient(
|
||||||
port=self.admin_port,
|
port=self.admin_port,
|
||||||
single_connection_client=True,
|
single_connection_client=True,
|
||||||
decode_responses=True,
|
decode_responses=True,
|
||||||
*args,
|
*args,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
self.clients.append(client)
|
||||||
|
return client
|
||||||
|
|
||||||
|
def cluster_client(self, *args, **kwargs) -> RedisCluster:
|
||||||
|
client = RedisCluster(
|
||||||
|
host="localhost", port=self.port, decode_responses=True, *args, **kwargs
|
||||||
|
)
|
||||||
|
self.clients.append(client)
|
||||||
|
return client
|
||||||
|
|
||||||
|
async def close_clients(self):
|
||||||
|
for client in self.clients:
|
||||||
|
await client.close()
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.start()
|
self.start()
|
||||||
|
@ -422,9 +439,10 @@ class DflyInstanceFactory:
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
instance._wait_for_server()
|
instance._wait_for_server()
|
||||||
|
|
||||||
def stop_all(self):
|
async def stop_all(self):
|
||||||
"""Stop all launched instances."""
|
"""Stop all launched instances."""
|
||||||
for instance in self.instances:
|
for instance in self.instances:
|
||||||
|
await instance.close_clients()
|
||||||
instance.stop()
|
instance.stop()
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
|
|
|
@ -47,8 +47,6 @@ async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
|
||||||
assert delta > 0
|
assert delta > 0
|
||||||
assert delta < max_unaccounted
|
assert delta < max_unaccounted
|
||||||
|
|
||||||
await disconnect_clients(client)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@dfly_args(
|
@dfly_args(
|
||||||
|
|
|
@ -132,8 +132,6 @@ async def test_replication_all(
|
||||||
# Check data after stable state stream
|
# Check data after stable state stream
|
||||||
await check()
|
await check()
|
||||||
|
|
||||||
await disconnect_clients(c_master, *c_replicas)
|
|
||||||
|
|
||||||
|
|
||||||
async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
|
async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
|
||||||
role = await c_replica.role()
|
role = await c_replica.role()
|
||||||
|
@ -304,7 +302,6 @@ async def test_disconnect_replica(
|
||||||
|
|
||||||
logging.debug("Check master survived all disconnects")
|
logging.debug("Check master survived all disconnects")
|
||||||
assert await c_master.ping()
|
assert await c_master.ping()
|
||||||
await c_master.close()
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -486,7 +483,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl
|
||||||
assert await seeder.compare(capture, replica.port)
|
assert await seeder.compare(capture, replica.port)
|
||||||
|
|
||||||
ping_job.cancel()
|
ping_job.cancel()
|
||||||
await c_replica.close()
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -902,8 +898,6 @@ async def test_scripts(df_factory, t_master, t_replicas, num_ops, num_keys, num_
|
||||||
l = await c_replica.lrange(k, 0, -1)
|
l = await c_replica.lrange(k, 0, -1)
|
||||||
assert l == [f"{j}"] * num_ops
|
assert l == [f"{j}"] * num_ops
|
||||||
|
|
||||||
await close_clients(c_master, *c_replicas)
|
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4})
|
@dfly_args({"proactor_threads": 4})
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -1145,9 +1139,6 @@ async def test_flushall_in_full_sync(df_factory):
|
||||||
new_syncid, _ = await c_replica.execute_command("DEBUG REPLICA OFFSET")
|
new_syncid, _ = await c_replica.execute_command("DEBUG REPLICA OFFSET")
|
||||||
assert new_syncid != syncid
|
assert new_syncid != syncid
|
||||||
|
|
||||||
await c_master.close()
|
|
||||||
await c_replica.close()
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Test read-only scripts work with replication. EVAL_RO and the 'no-writes' flags are currently not supported.
|
Test read-only scripts work with replication. EVAL_RO and the 'no-writes' flags are currently not supported.
|
||||||
|
@ -1250,8 +1241,6 @@ async def test_take_over_counters(df_factory, master_threads, replica_threads):
|
||||||
replicated_value = await c1.get(key)
|
replicated_value = await c1.get(key)
|
||||||
assert client_value == int(replicated_value)
|
assert client_value == int(replicated_value)
|
||||||
|
|
||||||
await disconnect_clients(c_master, c1, c_blocking, c2, c3)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
|
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -1308,8 +1297,6 @@ async def test_take_over_seeder(
|
||||||
capture = await seeder.capture(port=master.port)
|
capture = await seeder.capture(port=master.port)
|
||||||
assert await seeder.compare(capture, port=replica.port)
|
assert await seeder.compare(capture, port=replica.port)
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]])
|
@pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]])
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -1345,8 +1332,6 @@ async def test_take_over_read_commands(df_factory, master_threads, replica_threa
|
||||||
assert await c_replica.execute_command("role") == ["master", []]
|
assert await c_replica.execute_command("role") == ["master", []]
|
||||||
await promt_task
|
await promt_task
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_take_over_timeout(df_factory, df_seeder_factory):
|
async def test_take_over_timeout(df_factory, df_seeder_factory):
|
||||||
|
@ -1388,8 +1373,6 @@ async def test_take_over_timeout(df_factory, df_seeder_factory):
|
||||||
"online",
|
"online",
|
||||||
]
|
]
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
# 1. Number of master threads
|
# 1. Number of master threads
|
||||||
# 2. Number of threads for each replica
|
# 2. Number of threads for each replica
|
||||||
|
@ -1437,8 +1420,6 @@ async def test_no_tls_on_admin_port(
|
||||||
# 3. Verify that replica dbsize == debug populate key size -- replication works
|
# 3. Verify that replica dbsize == debug populate key size -- replication works
|
||||||
db_size = await c_replica.execute_command("DBSIZE")
|
db_size = await c_replica.execute_command("DBSIZE")
|
||||||
assert 100 == db_size
|
assert 100 == db_size
|
||||||
await c_replica.close()
|
|
||||||
await c_master.close()
|
|
||||||
|
|
||||||
|
|
||||||
# 1. Number of master threads
|
# 1. Number of master threads
|
||||||
|
@ -1509,8 +1490,6 @@ async def test_tls_replication(
|
||||||
db_size = await c_replica.execute_command("DBSIZE")
|
db_size = await c_replica.execute_command("DBSIZE")
|
||||||
assert 101 == db_size
|
assert 101 == db_size
|
||||||
|
|
||||||
await c_replica.close()
|
|
||||||
await c_master.close()
|
|
||||||
await proxy.close(proxy_task)
|
await proxy.close(proxy_task)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1681,15 +1660,12 @@ async def test_df_crash_on_memcached_error(df_factory):
|
||||||
c_replica = replica.client()
|
c_replica = replica.client()
|
||||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
await wait_available_async(c_replica)
|
await wait_available_async(c_replica)
|
||||||
await c_replica.close()
|
|
||||||
|
|
||||||
memcached_client = pymemcache.Client(f"127.0.0.1:{replica.mc_port}")
|
memcached_client = pymemcache.Client(f"127.0.0.1:{replica.mc_port}")
|
||||||
|
|
||||||
with pytest.raises(pymemcache.exceptions.MemcacheServerError):
|
with pytest.raises(pymemcache.exceptions.MemcacheServerError):
|
||||||
memcached_client.set("key", "data", noreply=False)
|
memcached_client.set("key", "data", noreply=False)
|
||||||
|
|
||||||
await c_master.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_df_crash_on_replicaof_flag(df_factory):
|
async def test_df_crash_on_replicaof_flag(df_factory):
|
||||||
|
@ -1713,9 +1689,6 @@ async def test_df_crash_on_replicaof_flag(df_factory):
|
||||||
res = await c_replica.execute_command("DBSIZE")
|
res = await c_replica.execute_command("DBSIZE")
|
||||||
assert res == 0
|
assert res == 0
|
||||||
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_network_disconnect(df_factory, df_seeder_factory):
|
async def test_network_disconnect(df_factory, df_seeder_factory):
|
||||||
master = df_factory.create(proactor_threads=6)
|
master = df_factory.create(proactor_threads=6)
|
||||||
|
@ -1746,9 +1719,6 @@ async def test_network_disconnect(df_factory, df_seeder_factory):
|
||||||
finally:
|
finally:
|
||||||
await proxy.close(task)
|
await proxy.close(task)
|
||||||
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_network_disconnect_active_stream(df_factory, df_seeder_factory):
|
async def test_network_disconnect_active_stream(df_factory, df_seeder_factory):
|
||||||
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=4000)
|
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=4000)
|
||||||
|
@ -1787,9 +1757,6 @@ async def test_network_disconnect_active_stream(df_factory, df_seeder_factory):
|
||||||
finally:
|
finally:
|
||||||
await proxy.close(task)
|
await proxy.close(task)
|
||||||
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory):
|
async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory):
|
||||||
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=1)
|
master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=1)
|
||||||
|
@ -1832,9 +1799,6 @@ async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory):
|
||||||
finally:
|
finally:
|
||||||
await proxy.close(task)
|
await proxy.close(task)
|
||||||
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
# Partial replication is currently not implemented so the following does not work
|
# Partial replication is currently not implemented so the following does not work
|
||||||
# assert master.is_in_logs("Partial sync requested from stale LSN")
|
# assert master.is_in_logs("Partial sync requested from stale LSN")
|
||||||
|
|
||||||
|
@ -1883,9 +1847,6 @@ async def test_replica_reconnections_after_network_disconnect(df_factory, df_see
|
||||||
finally:
|
finally:
|
||||||
await proxy.close(task)
|
await proxy.close(task)
|
||||||
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_search(df_factory):
|
async def test_search(df_factory):
|
||||||
master = df_factory.create(proactor_threads=4)
|
master = df_factory.create(proactor_threads=4)
|
||||||
|
@ -1968,8 +1929,6 @@ async def test_search_with_stream(df_factory: DflyInstanceFactory):
|
||||||
["name", "new-secret"],
|
["name", "new-secret"],
|
||||||
]
|
]
|
||||||
|
|
||||||
await close_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
# @pytest.mark.slow
|
# @pytest.mark.slow
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -2014,8 +1973,6 @@ async def test_client_pause_with_replica(df_factory, df_seeder_factory):
|
||||||
capture = await seeder.capture(port=master.port)
|
capture = await seeder.capture(port=master.port)
|
||||||
assert await seeder.compare(capture, port=replica.port)
|
assert await seeder.compare(capture, port=replica.port)
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
|
async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
|
||||||
master = df_factory.create()
|
master = df_factory.create()
|
||||||
|
@ -2045,10 +2002,6 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
|
||||||
await wait_available_async(c_replica, timeout=180)
|
await wait_available_async(c_replica, timeout=180)
|
||||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
|
|
||||||
await c_replica.close()
|
|
||||||
master.stop()
|
|
||||||
replica.stop()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_heartbeat_eviction_propagation(df_factory):
|
async def test_heartbeat_eviction_propagation(df_factory):
|
||||||
|
@ -2084,7 +2037,6 @@ async def test_heartbeat_eviction_propagation(df_factory):
|
||||||
keys_master = await c_master.execute_command("keys *")
|
keys_master = await c_master.execute_command("keys *")
|
||||||
keys_replica = await c_replica.execute_command("keys *")
|
keys_replica = await c_replica.execute_command("keys *")
|
||||||
assert set(keys_master) == set(keys_replica)
|
assert set(keys_master) == set(keys_replica)
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -2123,8 +2075,6 @@ async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory):
|
||||||
assert set(keys_replica).difference(keys_master) == set()
|
assert set(keys_replica).difference(keys_master) == set()
|
||||||
assert set(keys_master).difference(keys_replica) == set()
|
assert set(keys_master).difference(keys_replica) == set()
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory):
|
async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory):
|
||||||
|
@ -2178,8 +2128,6 @@ async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory):
|
||||||
keys_replica = await c_replica.execute_command("keys *")
|
keys_replica = await c_replica.execute_command("keys *")
|
||||||
assert set(keys_master) == set(keys_replica)
|
assert set(keys_master) == set(keys_replica)
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_saving_replica(df_factory):
|
async def test_saving_replica(df_factory):
|
||||||
|
@ -2208,8 +2156,6 @@ async def test_saving_replica(df_factory):
|
||||||
await save_task
|
await save_task
|
||||||
assert not await is_saving(c_replica)
|
assert not await is_saving(c_replica)
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_start_replicating_while_save(df_factory):
|
async def test_start_replicating_while_save(df_factory):
|
||||||
|
@ -2236,8 +2182,6 @@ async def test_start_replicating_while_save(df_factory):
|
||||||
await save_task
|
await save_task
|
||||||
assert not await is_saving(c_replica)
|
assert not await is_saving(c_replica)
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_user_acl_replication(df_factory):
|
async def test_user_acl_replication(df_factory):
|
||||||
|
@ -2325,8 +2269,6 @@ async def test_replica_reconnect(df_factory, break_conn):
|
||||||
await wait_available_async(c_replica)
|
await wait_available_async(c_replica)
|
||||||
assert await c_replica.execute_command("get k") == "6789"
|
assert await c_replica.execute_command("get k") == "6789"
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_announce_ip_port(df_factory):
|
async def test_announce_ip_port(df_factory):
|
||||||
|
@ -2416,7 +2358,6 @@ async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
|
||||||
await check_replica_connected() # still connected
|
await check_replica_connected() # still connected
|
||||||
await asyncio.sleep(1) # wait for the master to recognize it's being blocked
|
await asyncio.sleep(1) # wait for the master to recognize it's being blocked
|
||||||
await check_replica_disconnected()
|
await check_replica_disconnected()
|
||||||
df_factory.stop_all()
|
|
||||||
|
|
||||||
|
|
||||||
def download_dragonfly_release(version):
|
def download_dragonfly_release(version):
|
||||||
|
@ -2488,8 +2429,6 @@ async def test_replicate_old_master(
|
||||||
|
|
||||||
assert await c_replica.execute_command("get", "k1") == "v1"
|
assert await c_replica.execute_command("get", "k1") == "v1"
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
# This Test was intorduced in response to a bug when replicating empty hashmaps (encoded as
|
# This Test was intorduced in response to a bug when replicating empty hashmaps (encoded as
|
||||||
# ziplists) created with HSET, HSETEX, HDEL and then replicated 2 times.
|
# ziplists) created with HSET, HSETEX, HDEL and then replicated 2 times.
|
||||||
|
@ -2588,8 +2527,6 @@ async def test_empty_hashmap_loading_bug(df_factory: DflyInstanceFactory):
|
||||||
await wait_for_replicas_state(c_replica)
|
await wait_for_replicas_state(c_replica)
|
||||||
assert await c_replica.execute_command(f"dbsize") == 0
|
assert await c_replica.execute_command(f"dbsize") == 0
|
||||||
|
|
||||||
await close_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_replicating_mc_flags(df_factory):
|
async def test_replicating_mc_flags(df_factory):
|
||||||
master = df_factory.create(memcached_port=11211, proactor_threads=1)
|
master = df_factory.create(memcached_port=11211, proactor_threads=1)
|
||||||
|
@ -2627,8 +2564,6 @@ async def test_replicating_mc_flags(df_factory):
|
||||||
for i in range(1, 100):
|
for i in range(1, 100):
|
||||||
assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}")
|
assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}")
|
||||||
|
|
||||||
await close_clients(c_replica, c_master)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_double_take_over(df_factory, df_seeder_factory):
|
async def test_double_take_over(df_factory, df_seeder_factory):
|
||||||
|
@ -2671,8 +2606,6 @@ async def test_double_take_over(df_factory, df_seeder_factory):
|
||||||
|
|
||||||
assert await seeder.compare(capture, port=master.port)
|
assert await seeder.compare(capture, port=master.port)
|
||||||
|
|
||||||
await disconnect_clients(c_master, c_replica)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_replica_of_replica(df_factory):
|
async def test_replica_of_replica(df_factory):
|
||||||
|
@ -2692,5 +2625,3 @@ async def test_replica_of_replica(df_factory):
|
||||||
await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}")
|
await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}")
|
||||||
|
|
||||||
assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK"
|
assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK"
|
||||||
|
|
||||||
await disconnect_clients(c_replica, c_replica2)
|
|
||||||
|
|
|
@ -95,8 +95,6 @@ async def test_client_kill(df_factory):
|
||||||
with pytest.raises(Exception) as e_info:
|
with pytest.raises(Exception) as e_info:
|
||||||
await client_conn.ping()
|
await client_conn.ping()
|
||||||
|
|
||||||
await disconnect_clients(client, admin_client)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_scan(async_client: aioredis.Redis):
|
async def test_scan(async_client: aioredis.Redis):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -15,7 +15,7 @@ import random
|
||||||
from pymemcache.client.base import Client as MCClient
|
from pymemcache.client.base import Client as MCClient
|
||||||
|
|
||||||
from . import dfly_args
|
from . import dfly_args
|
||||||
from .utility import close_clients, wait_available_async, is_saving, tmp_file_name
|
from .utility import wait_available_async, is_saving, tmp_file_name
|
||||||
|
|
||||||
from .seeder import StaticSeeder
|
from .seeder import StaticSeeder
|
||||||
|
|
||||||
|
@ -643,5 +643,3 @@ async def test_mc_flags_saving(memcached_client: MCClient, async_client: aioredi
|
||||||
|
|
||||||
await check_flag("key1", 2)
|
await check_flag("key1", 2)
|
||||||
await check_flag("key2", 123456)
|
await check_flag("key2", 123456)
|
||||||
|
|
||||||
await close_clients(async_client)
|
|
||||||
|
|
|
@ -658,14 +658,6 @@ class DflySeederFactory:
|
||||||
return DflySeeder(log_file=self.log_file, **kwargs)
|
return DflySeeder(log_file=self.log_file, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
async def disconnect_clients(*clients):
|
|
||||||
await asyncio.gather(*(c.connection_pool.disconnect() for c in clients))
|
|
||||||
|
|
||||||
|
|
||||||
async def close_clients(*clients):
|
|
||||||
await asyncio.gather(*(c.close() for c in clients))
|
|
||||||
|
|
||||||
|
|
||||||
def gen_ca_cert(ca_key_path, ca_cert_path):
|
def gen_ca_cert(ca_key_path, ca_cert_path):
|
||||||
# We first need to generate the tls certificates to be used by the server
|
# We first need to generate the tls certificates to be used by the server
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue