From b19f72201194aceb95d70f4192b4f87416e2317b Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 30 Sep 2024 09:54:41 +0300 Subject: [PATCH] chore: do not close connections at the end of pytest (#3811) A common case is that we need to clean up a connection before we exit a test via .close() method. This is needed because otherwise the connection will raise a warning that it is left unclosed. However, remembering to call .close() at each connection at the end of the test is cumbersome! Luckily, fixtures in python can be marked as async which allow us to: * cache all clients created by DflyInstance.client() * clean them all at the end of the fixture in one go --------- Signed-off-by: kostas --- tests/dragonfly/acl_family_test.py | 23 --------- tests/dragonfly/cluster_mgr_test.py | 3 +- tests/dragonfly/cluster_test.py | 51 ++------------------ tests/dragonfly/conftest.py | 23 ++------- tests/dragonfly/connection_test.py | 5 -- tests/dragonfly/instance.py | 24 ++++++++-- tests/dragonfly/memory_test.py | 2 - tests/dragonfly/replication_test.py | 69 --------------------------- tests/dragonfly/server_family_test.py | 2 - tests/dragonfly/snapshot_test.py | 4 +- tests/dragonfly/utility.py | 8 ---- 11 files changed, 31 insertions(+), 183 deletions(-) diff --git a/tests/dragonfly/acl_family_test.py b/tests/dragonfly/acl_family_test.py index 71e49d8de..79890fbde 100644 --- a/tests/dragonfly/acl_family_test.py +++ b/tests/dragonfly/acl_family_test.py @@ -225,9 +225,6 @@ async def test_acl_cat_commands_multi_exec_squash(df_factory): with pytest.raises(redis.exceptions.NoPermissionError): await client.execute_command(f"SET x{x} {x}") - await admin_client.close() - await client.close() - @pytest.mark.asyncio async def test_acl_deluser(df_server): @@ -248,8 +245,6 @@ async def test_acl_deluser(df_server): assert await client.execute_command("ACL WHOAMI") == "User is default" - await close_clients(admin_client, client) - script = """ for i = 1, 100000 do @@ -279,8 +274,6 @@ async def test_acl_del_user_while_running_lua_script(df_server): res = await admin_client.get(f"key{i}") assert res == "100000" - await admin_client.close() - @pytest.mark.asyncio @pytest.mark.skip("Non deterministic") @@ -299,9 +292,6 @@ async def test_acl_with_long_running_script(df_server): res = await admin_client.get(f"key{i}") assert res == "100000" - await client.close() - await admin_client.close() - def create_temp_file(content, tmp_dir): file = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False) @@ -325,8 +315,6 @@ async def test_bad_acl_file(df_factory, tmp_dir): with pytest.raises(redis.exceptions.ResponseError): await client.execute_command("ACL LOAD") - await client.close() - @pytest.mark.asyncio @dfly_args({"port": 1111}) @@ -381,8 +369,6 @@ async def test_good_acl_file(df_factory, tmp_dir): assert "user vlad off ~foo ~bar* resetchannels -@all +@string" in result assert "user default on nopass ~* &* +@all" in result - await client.close() - @pytest.mark.asyncio async def test_acl_log(async_client): @@ -463,8 +449,6 @@ async def test_require_pass(df_factory): res = await client.execute_command("GET foo") assert res == "44" - await client.close() - @pytest.mark.asyncio @dfly_args({"port": 1111, "requirepass": "temp"}) @@ -480,7 +464,6 @@ async def test_require_pass_with_acl_file_order(df_factory, tmp_dir): client = aioredis.Redis(username="default", password="jordan", port=df.port) assert await client.set("foo", "bar") - await client.close() @pytest.mark.asyncio @@ -608,8 +591,6 @@ async def test_namespaces(df_server): assert await roman.execute_command("AUTH roman roman_pass") == "OK" assert await roman.execute_command("GET foo") == None - await close_clients(admin, adi, shahar, roman) - @pytest.mark.asyncio async def test_default_user_bug(df_server): @@ -623,8 +604,6 @@ async def test_default_user_bug(df_server): with pytest.raises(redis.exceptions.ResponseError): await client.execute_command("SET foo bar") - await client.close() - @pytest.mark.asyncio async def test_auth_resp3_bug(df_factory): @@ -642,8 +621,6 @@ async def test_auth_resp3_bug(df_factory): assert res["role"] == "master" assert res["id"] == 1 - await client.close() - @pytest.mark.asyncio async def test_acl_pub_sub_auth(df_factory): diff --git a/tests/dragonfly/cluster_mgr_test.py b/tests/dragonfly/cluster_mgr_test.py index 1abd47395..c120f21c4 100644 --- a/tests/dragonfly/cluster_mgr_test.py +++ b/tests/dragonfly/cluster_mgr_test.py @@ -161,5 +161,4 @@ async def test_cluster_mgr(df_factory): for i in range(NODES): assert run_cluster_mgr(["--action=detach", f"--target_port={replicas[i].port}"]) await check_cluster_data(client) - - await close_clients(client, *replica_clients, c_master0) + await client.close() diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 6fd63fe2c..f631084dc 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -332,8 +332,6 @@ async def test_emulated_cluster_with_replicas(df_factory): }, } - await close_clients(c_master, *c_replicas) - @dfly_args({"cluster_mode": "emulated"}) async def test_cluster_info(async_client): @@ -391,8 +389,6 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory): conn = node.client() assert "inigo montoya" == await get_node_id(conn) - await close_clients(conn) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): @@ -520,8 +516,6 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): assert (await c_nodes[0].get("KEY0")) == "value" assert await c_nodes[1].execute_command("DBSIZE") == 0 - await close_clients(*c_nodes, *c_nodes_admin) - # Tests that master commands to the replica are applied regardless of slot ownership @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -631,8 +625,6 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto assert await c_master.execute_command("dbsize") == 0 assert await c_replica.execute_command("dbsize") == 0 - await close_clients(c_master, c_master_admin, c_replica, c_replica_admin) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory): @@ -740,8 +732,6 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size) assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size) - await close_clients(c_master, c_master_admin, c_replica, c_replica_admin) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "admin_port": 30001}) async def test_cluster_blocking_command(df_server): @@ -786,8 +776,6 @@ async def test_cluster_blocking_command(df_server): await v2 assert "MOVED" in str(e_info.value) - await close_clients(c_master, c_master_admin) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_blocking_commands_cancel(df_factory, df_seeder_factory): @@ -827,8 +815,6 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory): await list_task assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value) - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @pytest.mark.parametrize("set_cluster_node_id", [True, False]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -847,7 +833,7 @@ async def test_cluster_native_client( for i in range(3) ] df_factory.start_all(masters) - c_masters = [aioredis.Redis(port=master.port) for master in masters] + c_masters = [master.client() for master in masters] c_masters_admin = [master.admin_client() for master in masters] master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) @@ -935,7 +921,7 @@ async def test_cluster_native_client( seeder = df_seeder_factory.create(port=masters[0].port, cluster_mode=True) await seeder.run(target_deviation=0.1) - client = aioredis.RedisCluster(decode_responses=True, host="localhost", port=masters[0].port) + client = masters[0].cluster_client() assert await client.set("key0", "value") == True assert await client.get("key0") == "value" @@ -1025,7 +1011,6 @@ async def test_cluster_native_client( await push_config(config, c_masters_admin + c_replicas_admin) await test_random_keys() - await close_clients(client, *c_masters, *c_masters_admin, *c_replicas, *c_replicas_admin) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -1073,7 +1058,6 @@ async def test_config_consistency(df_factory: DflyInstanceFactory): await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await check_for_no_state_status([node.admin_client for node in nodes]) - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -1129,8 +1113,6 @@ async def test_cluster_flushall_during_migration( assert await nodes[0].client.dbsize() == 0 - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @pytest.mark.parametrize("interrupt", [False, True]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -1180,7 +1162,6 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt assert 0 in slots[0] and 9000 in slots[0] assert 9001 in slots[1] and 16383 in slots[1] - await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes]) return await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") @@ -1214,7 +1195,6 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt assert await nodes[1].client.execute_command("DBSIZE") == 19 await check_for_no_state_status([node.admin_client for node in nodes]) - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) @dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"}) @@ -1265,8 +1245,6 @@ async def test_migration_with_key_ttl(df_factory): assert await nodes[1].client.execute_command("ttl k_without_ttl") == -1 assert await nodes[1].client.execute_command("stick k_sticky") == 0 - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_network_disconnect_during_migration(df_factory, df_seeder_factory): @@ -1314,8 +1292,6 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory capture = await seeder.capture() assert await seeder.compare(capture, nodes[1].instance.port) - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @pytest.mark.parametrize( "node_count, segments, keys", @@ -1366,9 +1342,7 @@ async def test_cluster_fuzzymigration( # Start ten counters counter_keys = [f"_counter{i}" for i in range(10)] - counter_connections = [ - aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10) - ] + counter_connections = [nodes[0].instance.cluster_client() for _ in range(10)] counters = [ asyncio.create_task(list_counter(key, conn)) for key, conn in zip(counter_keys, counter_connections) @@ -1454,7 +1428,7 @@ async def test_cluster_fuzzymigration( await counter # Check counter consistency - cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) + cluster_client = nodes[0].instance.cluster_client() for key in counter_keys: counter_list = await cluster_client.lrange(key, 0, -1) for i, j in zip(counter_list, counter_list[1:]): @@ -1464,9 +1438,6 @@ async def test_cluster_fuzzymigration( assert await seeder.compare(capture, nodes[0].instance.port) await asyncio.gather(*[c.close() for c in counter_connections]) - await close_clients( - cluster_client, *[node.admin_client for node in nodes], *[node.client for node in nodes] - ) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @@ -1517,8 +1488,6 @@ async def test_cluster_config_reapply(df_factory: DflyInstanceFactory): for i in range(SIZE): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_replication_migration( @@ -1597,10 +1566,9 @@ async def test_cluster_replication_migration( assert await seeder.compare(r2_capture, r1_node.instance.port) assert await seeder.compare(r1_capture, r2_node.instance.port) - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@pytest.mark.asyncio async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): """Check data migration from one node to another.""" instances = [ @@ -1660,8 +1628,6 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): for i in range(SIZE): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") - await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) - def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) @@ -1748,8 +1714,6 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact capture = await seeder.capture() assert await seeder.compare(capture, replica.port) - await disconnect_clients(*c_nodes, c_replica) - async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10): start = time.time() @@ -1863,7 +1827,6 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_ capture = await seeder.capture() assert await seeder.compare(capture, replica.port) - await disconnect_clients(*c_nodes, c_replica) await proxy.close(proxy_task) @@ -1940,8 +1903,6 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact capture = await seeder.capture() assert await seeder.compare(capture, replica.port) - await disconnect_clients(c_replica, *node_clients) - @dfly_args({"proactor_threads": 4}) async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_seeder_factory): @@ -2030,5 +1991,3 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_ await c_replica.execute_command("REPLICAOF NO ONE") capture = await seeder.capture() assert await seeder.compare(capture, replica.port) - - await disconnect_clients(c_replica, *node_clients) diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index 27a93e989..cca0f1186 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -112,8 +112,8 @@ def parse_args(args: List[str]) -> Dict[str, Union[str, None]]: return args_dict -@pytest.fixture(scope="function", params=[{}]) -def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: +@pytest_asyncio.fixture(scope="function", params=[{}]) +async def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: """ Create an instance factory with supplied params. """ @@ -142,7 +142,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: factory = DflyInstanceFactory(params, args) yield factory - factory.stop_all() + await factory.stop_all() @pytest.fixture(scope="function") @@ -185,23 +185,6 @@ def connection(df_server: DflyInstance): return redis.Connection(port=df_server.port) -# @pytest.fixture(scope="class") -# def sync_pool(df_server: DflyInstance): -# pool = redis.ConnectionPool(decode_responses=True, port=df_server.port) -# yield pool -# pool.disconnect() - - -# @pytest.fixture(scope="class") -# def client(sync_pool): -# """ -# Return a client to the default instance with all entries flushed. -# """ -# client = redis.Redis(connection_pool=sync_pool) -# client.flushall() -# return client - - @pytest.fixture(scope="function") def cluster_client(df_server): """ diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 83f5bab7f..eb9aa11ac 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -646,7 +646,6 @@ async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_factor client = server.admin_client(password="XXX") assert await client.dbsize() == 0 - await client.close() async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_factory): @@ -655,7 +654,6 @@ async def test_tls_insecure(with_ca_tls_server_args, with_tls_client_args, df_fa client = aioredis.Redis(port=server.port, **with_tls_client_args, ssl_cert_reqs=None) assert await client.dbsize() == 0 - await client.close() async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, df_factory): @@ -664,7 +662,6 @@ async def test_tls_full_auth(with_ca_tls_server_args, with_ca_tls_client_args, d client = aioredis.Redis(port=server.port, **with_ca_tls_client_args) assert await client.dbsize() == 0 - await client.close() async def test_tls_reject( @@ -680,7 +677,6 @@ async def test_tls_reject( client = server.client(**with_tls_client_args) with pytest.raises(redis_conn_error): await client.ping() - await client.close() @dfly_args({"proactor_threads": "4", "pipeline_squash": 10}) @@ -890,7 +886,6 @@ async def test_tls_when_read_write_is_interleaved( # This deadlocks client = aioredis.Redis(port=server.port, **with_ca_tls_client_args) await client.execute_command("GET foo") - await client.close() async def test_lib_name_ver(async_client: aioredis.Redis): diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 127646d86..019d27558 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -12,6 +12,7 @@ import psutil import itertools from prometheus_client.parser import text_string_to_metric_families from redis.asyncio import Redis as RedisClient +from redis.asyncio import RedisCluster as RedisCluster import signal @@ -95,6 +96,7 @@ class DflyInstance: self.log_files: List[str] = [] self.dynamic_port = False self.sed_proc = None + self.clients = [] if self.params.existing_port: self._port = self.params.existing_port @@ -126,16 +128,31 @@ class DflyInstance: def client(self, *args, **kwargs) -> RedisClient: host = "localhost" if self["bind"] is None else self["bind"] - return RedisClient(host=host, port=self.port, decode_responses=True, *args, **kwargs) + client = RedisClient(host=host, port=self.port, decode_responses=True, *args, **kwargs) + self.clients.append(client) + return client def admin_client(self, *args, **kwargs) -> RedisClient: - return RedisClient( + client = RedisClient( port=self.admin_port, single_connection_client=True, decode_responses=True, *args, **kwargs, ) + self.clients.append(client) + return client + + def cluster_client(self, *args, **kwargs) -> RedisCluster: + client = RedisCluster( + host="localhost", port=self.port, decode_responses=True, *args, **kwargs + ) + self.clients.append(client) + return client + + async def close_clients(self): + for client in self.clients: + await client.close() def __enter__(self): self.start() @@ -422,9 +439,10 @@ class DflyInstanceFactory: for instance in instances: instance._wait_for_server() - def stop_all(self): + async def stop_all(self): """Stop all launched instances.""" for instance in self.instances: + await instance.close_clients() instance.stop() def __repr__(self) -> str: diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index f2f82587e..ee0c6ff26 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -47,8 +47,6 @@ async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements): assert delta > 0 assert delta < max_unaccounted - await disconnect_clients(client) - @pytest.mark.asyncio @dfly_args( diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index c3c6f2c89..30d3672c0 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -132,8 +132,6 @@ async def test_replication_all( # Check data after stable state stream await check() - await disconnect_clients(c_master, *c_replicas) - async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset): role = await c_replica.role() @@ -304,7 +302,6 @@ async def test_disconnect_replica( logging.debug("Check master survived all disconnects") assert await c_master.ping() - await c_master.close() """ @@ -486,7 +483,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl assert await seeder.compare(capture, replica.port) ping_job.cancel() - await c_replica.close() """ @@ -902,8 +898,6 @@ async def test_scripts(df_factory, t_master, t_replicas, num_ops, num_keys, num_ l = await c_replica.lrange(k, 0, -1) assert l == [f"{j}"] * num_ops - await close_clients(c_master, *c_replicas) - @dfly_args({"proactor_threads": 4}) @pytest.mark.asyncio @@ -1145,9 +1139,6 @@ async def test_flushall_in_full_sync(df_factory): new_syncid, _ = await c_replica.execute_command("DEBUG REPLICA OFFSET") assert new_syncid != syncid - await c_master.close() - await c_replica.close() - """ Test read-only scripts work with replication. EVAL_RO and the 'no-writes' flags are currently not supported. @@ -1250,8 +1241,6 @@ async def test_take_over_counters(df_factory, master_threads, replica_threads): replicated_value = await c1.get(key) assert client_value == int(replicated_value) - await disconnect_clients(c_master, c1, c_blocking, c2, c3) - @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) @pytest.mark.asyncio @@ -1308,8 +1297,6 @@ async def test_take_over_seeder( capture = await seeder.capture(port=master.port) assert await seeder.compare(capture, port=replica.port) - await disconnect_clients(c_master, c_replica) - @pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]]) @pytest.mark.asyncio @@ -1345,8 +1332,6 @@ async def test_take_over_read_commands(df_factory, master_threads, replica_threa assert await c_replica.execute_command("role") == ["master", []] await promt_task - await disconnect_clients(c_master, c_replica) - @pytest.mark.asyncio async def test_take_over_timeout(df_factory, df_seeder_factory): @@ -1388,8 +1373,6 @@ async def test_take_over_timeout(df_factory, df_seeder_factory): "online", ] - await disconnect_clients(c_master, c_replica) - # 1. Number of master threads # 2. Number of threads for each replica @@ -1437,8 +1420,6 @@ async def test_no_tls_on_admin_port( # 3. Verify that replica dbsize == debug populate key size -- replication works db_size = await c_replica.execute_command("DBSIZE") assert 100 == db_size - await c_replica.close() - await c_master.close() # 1. Number of master threads @@ -1509,8 +1490,6 @@ async def test_tls_replication( db_size = await c_replica.execute_command("DBSIZE") assert 101 == db_size - await c_replica.close() - await c_master.close() await proxy.close(proxy_task) @@ -1681,15 +1660,12 @@ async def test_df_crash_on_memcached_error(df_factory): c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) - await c_replica.close() memcached_client = pymemcache.Client(f"127.0.0.1:{replica.mc_port}") with pytest.raises(pymemcache.exceptions.MemcacheServerError): memcached_client.set("key", "data", noreply=False) - await c_master.close() - @pytest.mark.asyncio async def test_df_crash_on_replicaof_flag(df_factory): @@ -1713,9 +1689,6 @@ async def test_df_crash_on_replicaof_flag(df_factory): res = await c_replica.execute_command("DBSIZE") assert res == 0 - master.stop() - replica.stop() - async def test_network_disconnect(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=6) @@ -1746,9 +1719,6 @@ async def test_network_disconnect(df_factory, df_seeder_factory): finally: await proxy.close(task) - master.stop() - replica.stop() - async def test_network_disconnect_active_stream(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=4000) @@ -1787,9 +1757,6 @@ async def test_network_disconnect_active_stream(df_factory, df_seeder_factory): finally: await proxy.close(task) - master.stop() - replica.stop() - async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=4, shard_repl_backlog_len=1) @@ -1832,9 +1799,6 @@ async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory): finally: await proxy.close(task) - master.stop() - replica.stop() - # Partial replication is currently not implemented so the following does not work # assert master.is_in_logs("Partial sync requested from stale LSN") @@ -1883,9 +1847,6 @@ async def test_replica_reconnections_after_network_disconnect(df_factory, df_see finally: await proxy.close(task) - master.stop() - replica.stop() - async def test_search(df_factory): master = df_factory.create(proactor_threads=4) @@ -1968,8 +1929,6 @@ async def test_search_with_stream(df_factory: DflyInstanceFactory): ["name", "new-secret"], ] - await close_clients(c_master, c_replica) - # @pytest.mark.slow @pytest.mark.asyncio @@ -2014,8 +1973,6 @@ async def test_client_pause_with_replica(df_factory, df_seeder_factory): capture = await seeder.capture(port=master.port) assert await seeder.compare(capture, port=replica.port) - await disconnect_clients(c_master, c_replica) - async def test_replicaof_reject_on_load(df_factory, df_seeder_factory): master = df_factory.create() @@ -2045,10 +2002,6 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory): await wait_available_async(c_replica, timeout=180) await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await c_replica.close() - master.stop() - replica.stop() - @pytest.mark.asyncio async def test_heartbeat_eviction_propagation(df_factory): @@ -2084,7 +2037,6 @@ async def test_heartbeat_eviction_propagation(df_factory): keys_master = await c_master.execute_command("keys *") keys_replica = await c_replica.execute_command("keys *") assert set(keys_master) == set(keys_replica) - await disconnect_clients(c_master, *[c_replica]) @pytest.mark.asyncio @@ -2123,8 +2075,6 @@ async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory): assert set(keys_replica).difference(keys_master) == set() assert set(keys_master).difference(keys_replica) == set() - await disconnect_clients(c_master, *[c_replica]) - @pytest.mark.asyncio async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory): @@ -2178,8 +2128,6 @@ async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory): keys_replica = await c_replica.execute_command("keys *") assert set(keys_master) == set(keys_replica) - await disconnect_clients(c_master, *[c_replica]) - @pytest.mark.asyncio async def test_saving_replica(df_factory): @@ -2208,8 +2156,6 @@ async def test_saving_replica(df_factory): await save_task assert not await is_saving(c_replica) - await disconnect_clients(c_master, *[c_replica]) - @pytest.mark.asyncio async def test_start_replicating_while_save(df_factory): @@ -2236,8 +2182,6 @@ async def test_start_replicating_while_save(df_factory): await save_task assert not await is_saving(c_replica) - await disconnect_clients(c_master, *[c_replica]) - @pytest.mark.asyncio async def test_user_acl_replication(df_factory): @@ -2325,8 +2269,6 @@ async def test_replica_reconnect(df_factory, break_conn): await wait_available_async(c_replica) assert await c_replica.execute_command("get k") == "6789" - await disconnect_clients(c_master, c_replica) - @pytest.mark.asyncio async def test_announce_ip_port(df_factory): @@ -2416,7 +2358,6 @@ async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory): await check_replica_connected() # still connected await asyncio.sleep(1) # wait for the master to recognize it's being blocked await check_replica_disconnected() - df_factory.stop_all() def download_dragonfly_release(version): @@ -2488,8 +2429,6 @@ async def test_replicate_old_master( assert await c_replica.execute_command("get", "k1") == "v1" - await disconnect_clients(c_master, c_replica) - # This Test was intorduced in response to a bug when replicating empty hashmaps (encoded as # ziplists) created with HSET, HSETEX, HDEL and then replicated 2 times. @@ -2588,8 +2527,6 @@ async def test_empty_hashmap_loading_bug(df_factory: DflyInstanceFactory): await wait_for_replicas_state(c_replica) assert await c_replica.execute_command(f"dbsize") == 0 - await close_clients(c_master, c_replica) - async def test_replicating_mc_flags(df_factory): master = df_factory.create(memcached_port=11211, proactor_threads=1) @@ -2627,8 +2564,6 @@ async def test_replicating_mc_flags(df_factory): for i in range(1, 100): assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}") - await close_clients(c_replica, c_master) - @pytest.mark.asyncio async def test_double_take_over(df_factory, df_seeder_factory): @@ -2671,8 +2606,6 @@ async def test_double_take_over(df_factory, df_seeder_factory): assert await seeder.compare(capture, port=master.port) - await disconnect_clients(c_master, c_replica) - @pytest.mark.asyncio async def test_replica_of_replica(df_factory): @@ -2692,5 +2625,3 @@ async def test_replica_of_replica(df_factory): await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}") assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" - - await disconnect_clients(c_replica, c_replica2) diff --git a/tests/dragonfly/server_family_test.py b/tests/dragonfly/server_family_test.py index 571d0f4dd..6d1451e0d 100644 --- a/tests/dragonfly/server_family_test.py +++ b/tests/dragonfly/server_family_test.py @@ -95,8 +95,6 @@ async def test_client_kill(df_factory): with pytest.raises(Exception) as e_info: await client_conn.ping() - await disconnect_clients(client, admin_client) - async def test_scan(async_client: aioredis.Redis): """ diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index e44dd29f7..a85c0f62b 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -15,7 +15,7 @@ import random from pymemcache.client.base import Client as MCClient from . import dfly_args -from .utility import close_clients, wait_available_async, is_saving, tmp_file_name +from .utility import wait_available_async, is_saving, tmp_file_name from .seeder import StaticSeeder @@ -643,5 +643,3 @@ async def test_mc_flags_saving(memcached_client: MCClient, async_client: aioredi await check_flag("key1", 2) await check_flag("key2", 123456) - - await close_clients(async_client) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 522f64031..59d6a475c 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -658,14 +658,6 @@ class DflySeederFactory: return DflySeeder(log_file=self.log_file, **kwargs) -async def disconnect_clients(*clients): - await asyncio.gather(*(c.connection_pool.disconnect() for c in clients)) - - -async def close_clients(*clients): - await asyncio.gather(*(c.close() for c in clients)) - - def gen_ca_cert(ca_key_path, ca_cert_path): # We first need to generate the tls certificates to be used by the server