diff --git a/.github/actions/regression-tests/action.yml b/.github/actions/regression-tests/action.yml index c742eeeac..6901a8f6e 100644 --- a/.github/actions/regression-tests/action.yml +++ b/.github/actions/regression-tests/action.yml @@ -37,7 +37,11 @@ runs: export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}" export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors - timeout 20m pytest -m "${{inputs.filter}}" --json-report --json-report-file=report.json dragonfly --ignore=dragonfly/replication_test.py --log-cli-level=INFO || code=$?; if [[ $code -ne 0 ]]; then echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"; exit 1; fi + timeout 20m pytest -m "${{inputs.filter}}" --color=yes --json-report --json-report-file=report.json dragonfly --ignore=dragonfly/replication_test.py --log-cli-level=INFO || code=$? + if [[ $code -ne 0 ]]; then + echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"; + exit 1 + fi - name: Run PyTests replication test id: second @@ -50,7 +54,13 @@ runs: export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}" run_pytest_with_args() { - timeout 20m pytest -m "${{inputs.filter}}" --json-report --json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO --df alsologtostderr $1 $2 || code=$?; if [[ $code -ne 0 ]]; then echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"; exit 1; fi + timeout 20m pytest -m "${{inputs.filter}}" --color=yes --json-report \ + --json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \ + --df alsologtostderr $1 $2 || code=$? + if [[ $code -ne 0 ]]; then + echo "TIMEDOUT=1">> "$GITHUB_OUTPUT" + exit 1 + fi } (run_pytest_with_args --df enable_multi_shard_sync=true) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 34d6d36c6..416395d16 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -4,23 +4,27 @@ import redis from redis import asyncio as aioredis import asyncio +from .instance import DflyInstanceFactory from .utility import * from .replication_test import check_all_replicas_finished + from . import dfly_args BASE_PORT = 30001 async def push_config(config, admin_connections): - print("Pushing config ", config) - await asyncio.gather( + logging.debug("Pushing config %s", config) + res = await asyncio.gather( *(c_admin.execute_command("DFLYCLUSTER", "CONFIG", config) for c_admin in admin_connections) ) + assert all([r == "OK" for r in res]) async def get_node_id(admin_connection): id = await admin_connection.execute_command("DFLYCLUSTER MYID") - return id.decode() + assert isinstance(id, str) + return id @dfly_args({}) @@ -168,7 +172,7 @@ Also add keys to each of them that are *not* moved, and see that they are unaffe @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_slot_ownership_changes(df_local_factory): +async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 2 nodes nodes = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) @@ -177,8 +181,8 @@ async def test_cluster_slot_ownership_changes(df_local_factory): df_local_factory.start_all(nodes) - c_nodes = [aioredis.Redis(port=node.port) for node in nodes] - c_nodes_admin = [aioredis.Redis(port=node.admin_port) for node in nodes] + c_nodes = [node.client() for node in nodes] + c_nodes_admin = [node.admin_client() for node in nodes] node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) @@ -233,7 +237,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory): assert await c_nodes[0].execute_command("DBSIZE") == 2 # Make sure that node0 owns "KEY0" - assert (await c_nodes[0].get("KEY0")).decode() == "value" + assert (await c_nodes[0].get("KEY0")) == "value" # Make sure that "KEY1" is not owned by node1 try: @@ -255,7 +259,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory): # node0 should have removed "KEY1" as it no longer owns it assert await c_nodes[0].execute_command("DBSIZE") == 1 # node0 should still own "KEY0" though - assert (await c_nodes[0].get("KEY0")).decode() == "value" + assert (await c_nodes[0].get("KEY0")) == "value" # node1 should still have "KEY2" assert await c_nodes[1].execute_command("DBSIZE") == 1 @@ -291,7 +295,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory): await push_config(config, c_nodes_admin) assert await c_nodes[0].execute_command("DBSIZE") == 1 - assert (await c_nodes[0].get("KEY0")).decode() == "value" + assert (await c_nodes[0].get("KEY0")) == "value" assert await c_nodes[1].execute_command("DBSIZE") == 0 @@ -337,7 +341,7 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): await c_master.set("key", "value") await c_replica.execute_command("REPLICAOF", "localhost", master.port) await check_all_replicas_finished([c_replica], c_master) - assert (await c_replica.get("key")).decode() == "value" + assert (await c_replica.get("key")) == "value" assert await c_replica.execute_command("dbsize") == 1 # Tell the replica that it and the master no longer own any data, but don't tell that to the @@ -405,18 +409,18 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_flush_slots_after_config_change(df_local_factory): +async def test_cluster_flush_slots_after_config_change(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 1 master and 1 replica, both own all slots master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001) df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_master_admin = aioredis.Redis(port=master.admin_port) + c_master = master.client() + c_master_admin = master.admin_client() master_id = await get_node_id(c_master_admin) - c_replica = aioredis.Redis(port=replica.port) - c_replica_admin = aioredis.Redis(port=replica.admin_port) + c_replica = replica.client() + c_replica_admin = replica.admin_client() replica_id = await get_node_id(c_replica_admin) config = f""" @@ -512,7 +516,7 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_native_client(df_local_factory): +async def test_cluster_native_client(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 3 masters and 3 replicas masters = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) @@ -520,7 +524,7 @@ async def test_cluster_native_client(df_local_factory): ] df_local_factory.start_all(masters) c_masters = [aioredis.Redis(port=master.port) for master in masters] - c_masters_admin = [aioredis.Redis(port=master.admin_port) for master in masters] + c_masters_admin = [master.admin_client() for master in masters] master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) replicas = [ @@ -528,8 +532,8 @@ async def test_cluster_native_client(df_local_factory): for i in range(3) ] df_local_factory.start_all(replicas) - c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] - c_replicas_admin = [aioredis.Redis(port=replica.admin_port) for replica in replicas] + c_replicas = [replica.client() for replica in replicas] + c_replicas_admin = [replica.admin_client() for replica in replicas] replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin)) config = f""" diff --git a/tests/dragonfly/config_test.py b/tests/dragonfly/config_test.py index 68316957d..86d328cab 100644 --- a/tests/dragonfly/config_test.py +++ b/tests/dragonfly/config_test.py @@ -9,7 +9,7 @@ async def test_maxclients(df_factory): # Needs some authentication with df_factory.create(port=1111, maxclients=1, admin_port=1112) as server: async with server.client() as client1: - assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients") + assert ["maxclients", "1"] == await client1.execute_command("CONFIG GET maxclients") with pytest.raises(redis.exceptions.ConnectionError): async with server.client() as client2: @@ -20,6 +20,6 @@ async def test_maxclients(df_factory): await admin_client.get("test") await client1.execute_command("CONFIG SET maxclients 3") - assert [b"maxclients", b"3"] == await client1.execute_command("CONFIG GET maxclients") + assert ["maxclients", "3"] == await client1.execute_command("CONFIG GET maxclients") async with server.client() as client2: await client2.get("test") diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 58a783185..58a75f71b 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -91,10 +91,16 @@ class DflyInstance: assert self.proc == None def client(self, *args, **kwargs) -> RedisClient: - return RedisClient(port=self.port, *args, **kwargs) + return RedisClient(port=self.port, decode_responses=True, *args, **kwargs) def admin_client(self, *args, **kwargs) -> RedisClient: - return RedisClient(port=self.admin_port, *args, **kwargs) + return RedisClient( + port=self.admin_port, + single_connection_client=True, + decode_responses=True, + *args, + **kwargs, + ) def __enter__(self): self.start() diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 91a7ccef3..fadba85af 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -3,10 +3,9 @@ from itertools import chain, repeat import re import pytest import asyncio -import redis from redis import asyncio as aioredis from .utility import * -from .instance import DflyInstanceFactory +from .instance import DflyInstanceFactory, DflyInstance from . import dfly_args import pymemcache import logging @@ -42,7 +41,12 @@ replication_cases = [ @pytest.mark.slow @pytest.mark.parametrize("t_master, t_replicas, seeder_config, from_admin_port", replication_cases) async def test_replication_all( - df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config, from_admin_port + df_local_factory: DflyInstanceFactory, + df_seeder_factory, + t_master, + t_replicas, + seeder_config, + from_admin_port, ): master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master) replicas = [ @@ -94,19 +98,19 @@ async def test_replication_all( await disconnect_clients(c_master, *c_replicas) -async def check_replica_finished_exec(c_replica, c_master): - role = await c_replica.execute_command("role") - if role[0] != b"replica" or role[3] != b"stable_sync": +async def check_replica_finished_exec(c_replica: aioredis.Redis, c_master: aioredis.Redis): + role = await c_replica.role() + if role[0] != "replica" or role[3] != "stable_sync": return False syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET") m_offset = await c_master.execute_command("DFLY REPLICAOFFSET") - print(" offset", syncid.decode(), r_offset, m_offset) + logging.debug(f" offset {syncid} {r_offset} {m_offset}") return r_offset == m_offset async def check_all_replicas_finished(c_replicas, c_master, timeout=20): - print("Waiting for replicas to finish") + logging.debug("Waiting for replicas to finish") waiting_for = list(c_replicas) start = time.time() @@ -120,6 +124,8 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20): # Remove clients that finished from waiting list waiting_for = [c for (c, finished) in zip(waiting_for, finished_list) if not finished] + first_r: aioredis.Redis = waiting_for[0] + logging.error("Replica not finished, role %s", await first_r.role()) raise RuntimeError("Not all replicas finished in time!") @@ -186,14 +192,12 @@ async def test_disconnect_replica( # Start master master.start() - c_master = aioredis.Redis(port=master.port, single_connection_client=True) + c_master = master.client(single_connection_client=True) # Start replicas and create clients df_local_factory.start_all([replica for replica, _ in replicas]) - c_replicas = [ - (replica, aioredis.Redis(port=replica.port), crash_type) for replica, crash_type in replicas - ] + c_replicas = [(replica, replica.client(), crash_type) for replica, crash_type in replicas] def replicas_of_type(tfunc): return [args for args in c_replicas if tfunc(args[2])] @@ -203,15 +207,16 @@ async def test_disconnect_replica( fill_task = asyncio.create_task(seeder.run()) # Run full sync - async def full_sync(replica, c_replica, crash_type): - c_replica = aioredis.Redis(port=replica.port) + async def full_sync(replica: DflyInstance, c_replica, crash_type): + c_replica = replica.client(single_connection_client=True) await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) if crash_type == 0: await asyncio.sleep(random.random() / 100 + 0.01) - await c_replica.connection_pool.disconnect() + await c_replica.close() replica.stop(kill=True) else: await wait_available_async(c_replica) + await c_replica.close() await asyncio.gather(*(full_sync(*args) for args in c_replicas)) @@ -308,7 +313,7 @@ async def test_disconnect_master( replicas = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)] df_local_factory.start_all(replicas) - c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] + c_replicas = [replica.client() for replica in replicas] seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2) @@ -319,7 +324,7 @@ async def test_disconnect_master( async def start_master(): await asyncio.sleep(0.2) master.start() - async with aioredis.Redis(port=master.port) as c_master: + async with master.client() as c_master: assert await c_master.ping() seeder.reset() await seeder.run(target_deviation=0.1) @@ -380,7 +385,7 @@ async def test_rotating_masters( seeders = [df_seeder_factory.create(port=m.port, **seeder_config) for m in masters] - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await asyncio.gather(*(seeder.run(target_deviation=0.1) for seeder in seeders)) @@ -425,7 +430,7 @@ async def test_cancel_replication_immediately( df_local_factory.start_all([replica, master]) seeder = df_seeder_factory.create(port=master.port) - c_replica = aioredis.Redis(port=replica.port, socket_timeout=20) + c_replica = replica.client(socket_timeout=20) await seeder.run(target_deviation=0.1) @@ -476,7 +481,7 @@ async def test_flushall(df_local_factory): replica.start() # Connect replica to master - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") n_keys = 1000 @@ -485,7 +490,7 @@ async def test_flushall(df_local_factory): for i in range(start, end): yield f"key-{i}", f"value-{i}" - c_master = aioredis.Redis(port=master.port) + c_master = master.client() pipe = c_master.pipeline(transaction=False) # Set simple keys 0..n_keys on master batch_fill_data(client=pipe, gen=gen_test_data(0, n_keys), batch_size=3) @@ -530,8 +535,8 @@ async def test_rewrites(df_local_factory): replica.start() # Connect clients, connect replica to master - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) @@ -697,12 +702,12 @@ Test automatic replication of expiry. @pytest.mark.asyncio async def test_expiry(df_local_factory, n_keys=1000): master = df_local_factory.create() - replica = df_local_factory.create(logtostdout=True) + replica = df_local_factory.create() df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() # Connect replica to master await c_replica.execute_command(f"REPLICAOF localhost {master.port}") @@ -810,8 +815,8 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys df_local_factory.start_all([master] + replicas) - c_master = aioredis.Redis(port=master.port) - c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] + c_master = master.client() + c_replicas = [replica.client() for replica in replicas] for c_replica in c_replicas: await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) @@ -824,7 +829,7 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys rsps = await asyncio.gather( *(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets) ) - assert rsps == [b"OK"] * num_par + assert rsps == ["OK"] * num_par await check_all_replicas_finished(c_replicas, c_master) @@ -847,8 +852,8 @@ async def test_auth_master(df_local_factory, n_keys=20): df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port, password=masterpass) - c_replica = aioredis.Redis(port=replica.port, password=replicapass) + c_master = master.client(password=masterpass) + c_replica = replica.client(password=replicapass) # Connect replica to master await c_replica.execute_command(f"REPLICAOF localhost {master.port}") @@ -877,8 +882,8 @@ async def test_script_transfer(df_local_factory): df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() # Load some scripts into master ahead scripts = [] @@ -907,28 +912,30 @@ async def test_script_transfer(df_local_factory): @pytest.mark.asyncio async def test_role_command(df_local_factory, n_keys=20): master = df_local_factory.create() - replica = df_local_factory.create(logtostdout=True) + replica = df_local_factory.create() df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() - assert await c_master.execute_command("role") == [b"master", []] + assert await c_master.execute_command("role") == ["master", []] await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) + # It may take a bit more time to actually propagate the role change + # See https://github.com/dragonflydb/dragonfly/pull/2111 await asyncio.sleep(1) assert await c_master.execute_command("role") == [ - b"master", - [[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], + "master", + [["127.0.0.1", str(replica.port), "stable_sync"]], ] assert await c_replica.execute_command("role") == [ - b"replica", - b"localhost", - bytes(str(master.port), "ascii"), - b"stable_sync", + "replica", + "localhost", + str(master.port), + "stable_sync", ] # This tests that we react fast to socket shutdowns and don't hang on @@ -936,18 +943,18 @@ async def test_role_command(df_local_factory, n_keys=20): master.stop() await asyncio.sleep(0.1) assert await c_replica.execute_command("role") == [ - b"replica", - b"localhost", - bytes(str(master.port), "ascii"), - b"connecting", + "replica", + "localhost", + str(master.port), + "connecting", ] await c_master.connection_pool.disconnect() await c_replica.connection_pool.disconnect() -def parse_lag(replication_info: bytes): - lags = re.findall(b"lag=([0-9]+)\r\n", replication_info) +def parse_lag(replication_info: str): + lags = re.findall("lag=([0-9]+)\r\n", replication_info) assert len(lags) == 1 return int(lags[0]) @@ -984,8 +991,8 @@ async def test_replication_info(df_local_factory, df_seeder_factory, n_keys=2000 master = df_local_factory.create() replica = df_local_factory.create(logtostdout=True, replication_acks_interval=100) df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) @@ -1019,7 +1026,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory): # Start master master.start() - c_master = aioredis.Redis(port=master.port) + c_master = master.client() # Fill master with test data seeder = df_seeder_factory.create(port=master.port, keys=100_000, dbcount=1) @@ -1027,7 +1034,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory): # Start replica replica.start() - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") async def get_sync_mode(c_master): @@ -1036,7 +1043,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory): return result[1][0][2] async def is_full_sync_mode(c_master): - return await get_sync_mode(c_master) == b"full_sync" + return await get_sync_mode(c_master) == "full_sync" # Wait for full sync to start while not await is_full_sync_mode(c_master): @@ -1088,8 +1095,8 @@ async def test_readonly_script(df_local_factory): df_local_factory.start_all([master, replica]) - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port) + c_master = master.client() + c_replica = replica.client() await c_master.set("WORKS", "YES") @@ -1165,7 +1172,7 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre _, _, *results = await asyncio.gather( delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)] ) - assert await c1.execute_command("role") == [b"master", []] + assert await c1.execute_command("role") == ["master", []] for key, client_value in results: replicated_value = await c1.get(key) @@ -1205,7 +1212,7 @@ async def test_take_over_seeder( await c_replica.execute_command(f"REPLTAKEOVER 5") seeder.stop() - assert await c_replica.execute_command("role") == [b"master", []] + assert await c_replica.execute_command("role") == ["master", []] # Need to wait a bit to give time to write the shutdown snapshot await asyncio.sleep(1) @@ -1251,14 +1258,14 @@ async def test_take_over_timeout(df_local_factory, df_seeder_factory): await fill_task assert await c_master.execute_command("role") == [ - b"master", - [[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], + "master", + [["127.0.0.1", str(replica.port), "stable_sync"]], ] assert await c_replica.execute_command("role") == [ - b"replica", - b"localhost", - bytes(str(master.port), "ascii"), - b"stable_sync", + "replica", + "localhost", + str(master.port), + "stable_sync", ] await disconnect_clients(c_master, c_replica) @@ -1272,7 +1279,11 @@ replication_cases = [(8, 8)] @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replica", replication_cases) async def test_no_tls_on_admin_port( - df_local_factory, df_seeder_factory, t_master, t_replica, with_tls_server_args + df_local_factory: DflyInstanceFactory, + df_seeder_factory, + t_master, + t_replica, + with_tls_server_args, ): # 1. Spin up dragonfly without tls, debug populate master = df_local_factory.create( @@ -1283,7 +1294,7 @@ async def test_no_tls_on_admin_port( proactor_threads=t_master, ) master.start() - c_master = aioredis.Redis(port=master.admin_port, password="XXX") + c_master = master.admin_client(password="XXX") await c_master.execute_command("DEBUG POPULATE 100") db_size = await c_master.execute_command("DBSIZE") assert 100 == db_size @@ -1298,9 +1309,9 @@ async def test_no_tls_on_admin_port( masterauth="XXX", ) replica.start() - c_replica = aioredis.Redis(port=replica.admin_port, password="XXX") + c_replica = replica.admin_client(password="XXX") res = await c_replica.execute_command("REPLICAOF localhost " + str(master.admin_port)) - assert b"OK" == res + assert "OK" == res await check_all_replicas_finished([c_replica], c_master) # 3. Verify that replica dbsize == debug populate key size -- replication works @@ -1336,7 +1347,7 @@ async def test_tls_replication( proactor_threads=t_master, ) master.start() - c_master = aioredis.Redis(port=master.port, **with_ca_tls_client_args) + c_master = master.client(**with_ca_tls_client_args) await c_master.execute_command("DEBUG POPULATE 100") db_size = await c_master.execute_command("DBSIZE") assert 100 == db_size @@ -1348,10 +1359,10 @@ async def test_tls_replication( proactor_threads=t_replica, ) replica.start() - c_replica = aioredis.Redis(port=replica.port, **with_ca_tls_client_args) + c_replica = replica.client(**with_ca_tls_client_args) port = master.port if not test_admin_port else master.admin_port res = await c_replica.execute_command("REPLICAOF localhost " + str(port)) - assert b"OK" == res + assert "OK" == res await check_all_replicas_finished([c_replica], c_master) # 3. Verify that replica dbsize == debug populate key size -- replication works @@ -1362,7 +1373,7 @@ async def test_tls_replication( master.stop(kill=True) await asyncio.sleep(3) master.start() - c_master = aioredis.Redis(port=master.port, **with_ca_tls_client_args) + c_master = master.client(**with_ca_tls_client_args) # Master doesn't load the snapshot, therefore dbsize should be 0 await c_master.execute_command("SET MY_KEY 1") db_size = await c_master.execute_command("DBSIZE") @@ -1399,8 +1410,8 @@ async def test_replicaof_flag(df_local_factory): # set up master master.start() - c_master = aioredis.Redis(port=master.port) - await c_master.set("KEY", b"VALUE") + c_master = master.client() + await c_master.set("KEY", "VALUE") db_size = await c_master.dbsize() assert 1 == db_size @@ -1411,7 +1422,7 @@ async def test_replicaof_flag(df_local_factory): # set up replica. check that it is replicating replica.start() - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await wait_available_async(c_replica) # give it time to startup # wait until we have a connection @@ -1422,7 +1433,7 @@ async def test_replicaof_flag(df_local_factory): assert 1 == dbsize val = await c_replica.get("KEY") - assert b"VALUE" == val + assert "VALUE" == val @pytest.mark.asyncio @@ -1436,7 +1447,7 @@ async def test_replicaof_flag_replication_waits(df_local_factory): # set up replica first replica.start() - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await wait_for_replica_status(c_replica, status="down") # check that it is in replica mode, yet status is down @@ -1453,8 +1464,8 @@ async def test_replicaof_flag_replication_waits(df_local_factory): ) master.start() - c_master = aioredis.Redis(port=master.port) - await c_master.set("KEY", b"VALUE") + c_master = master.client() + await c_master.set("KEY", "VALUE") db_size = await c_master.dbsize() assert 1 == db_size @@ -1466,7 +1477,7 @@ async def test_replicaof_flag_replication_waits(df_local_factory): assert 1 == dbsize val = await c_replica.get("KEY") - assert b"VALUE" == val + assert "VALUE" == val @pytest.mark.asyncio @@ -1478,10 +1489,10 @@ async def test_replicaof_flag_disconnect(df_local_factory): # set up master master.start() - c_master = aioredis.Redis(port=master.port) + c_master = master.client() await wait_available_async(c_master) - await c_master.set("KEY", b"VALUE") + await c_master.set("KEY", "VALUE") db_size = await c_master.dbsize() assert 1 == db_size @@ -1493,7 +1504,7 @@ async def test_replicaof_flag_disconnect(df_local_factory): # set up replica. check that it is replicating replica.start() - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await wait_available_async(c_replica) await wait_for_replica_status(c_replica, status="up") await check_all_replicas_finished([c_replica], c_master) @@ -1502,12 +1513,12 @@ async def test_replicaof_flag_disconnect(df_local_factory): assert 1 == dbsize val = await c_replica.get("KEY") - assert b"VALUE" == val + assert "VALUE" == val await c_replica.replicaof("no", "one") # disconnect role = await c_replica.role() - assert role[0] == b"master" + assert role[0] == "master" @pytest.mark.asyncio @@ -1525,10 +1536,10 @@ async def test_df_crash_on_memcached_error(df_local_factory): master.start() replica.start() - c_master = aioredis.Redis(port=master.port) + c_master = master.client() await wait_available_async(c_master) - c_replica = aioredis.Redis(port=replica.port) + c_replica = replica.client() await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) await wait_for_replica_status(c_replica, status="up") @@ -1537,7 +1548,7 @@ async def test_df_crash_on_memcached_error(df_local_factory): memcached_client = pymemcache.Client(f"localhost:{replica.mc_port}") with pytest.raises(pymemcache.exceptions.MemcacheServerError): - memcached_client.set(b"key", b"data", noreply=False) + memcached_client.set("key", "data", noreply=False) await c_master.close() @@ -1552,8 +1563,8 @@ async def test_df_crash_on_replicaof_flag(df_local_factory): replica = df_local_factory.create(proactor_threads=2, replicaof=f"127.0.0.1:{master.port}") replica.start() - c_master = aioredis.Redis(port=master.port) - c_replica = aioredis.Redis(port=replica.port, decode_responses=True) + c_master = master.client() + c_replica = replica.client() await wait_available_async(c_master) await wait_available_async(c_replica) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 94b94e739..5f3842809 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -148,7 +148,7 @@ class TestDflyAutoLoadSnapshot(SnapshotTestBase): async with df_server.client() as client: await wait_available_async(client) response = await client.get("TEST") - assert response.decode("utf-8") == str(hash(dbfilename)) + assert response == str(hash(dbfilename)) @dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "save_schedule": "*:*"})