1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: use decode_responses when creating a redis client (#2109)

* chore: use decode_responses when creating a redis client

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-11-03 11:00:26 +02:00 committed by GitHub
parent 2f39e89189
commit 7aa3dba423
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 147 additions and 116 deletions

View file

@ -37,7 +37,11 @@ runs:
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}" export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
timeout 20m pytest -m "${{inputs.filter}}" --json-report --json-report-file=report.json dragonfly --ignore=dragonfly/replication_test.py --log-cli-level=INFO || code=$?; if [[ $code -ne 0 ]]; then echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"; exit 1; fi timeout 20m pytest -m "${{inputs.filter}}" --color=yes --json-report --json-report-file=report.json dragonfly --ignore=dragonfly/replication_test.py --log-cli-level=INFO || code=$?
if [[ $code -ne 0 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT";
exit 1
fi
- name: Run PyTests replication test - name: Run PyTests replication test
id: second id: second
@ -50,7 +54,13 @@ runs:
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}" export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
run_pytest_with_args() { run_pytest_with_args() {
timeout 20m pytest -m "${{inputs.filter}}" --json-report --json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO --df alsologtostderr $1 $2 || code=$?; if [[ $code -ne 0 ]]; then echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"; exit 1; fi timeout 20m pytest -m "${{inputs.filter}}" --color=yes --json-report \
--json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \
--df alsologtostderr $1 $2 || code=$?
if [[ $code -ne 0 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"
exit 1
fi
} }
(run_pytest_with_args --df enable_multi_shard_sync=true) (run_pytest_with_args --df enable_multi_shard_sync=true)

View file

@ -4,23 +4,27 @@ import redis
from redis import asyncio as aioredis from redis import asyncio as aioredis
import asyncio import asyncio
from .instance import DflyInstanceFactory
from .utility import * from .utility import *
from .replication_test import check_all_replicas_finished from .replication_test import check_all_replicas_finished
from . import dfly_args from . import dfly_args
BASE_PORT = 30001 BASE_PORT = 30001
async def push_config(config, admin_connections): async def push_config(config, admin_connections):
print("Pushing config ", config) logging.debug("Pushing config %s", config)
await asyncio.gather( res = await asyncio.gather(
*(c_admin.execute_command("DFLYCLUSTER", "CONFIG", config) for c_admin in admin_connections) *(c_admin.execute_command("DFLYCLUSTER", "CONFIG", config) for c_admin in admin_connections)
) )
assert all([r == "OK" for r in res])
async def get_node_id(admin_connection): async def get_node_id(admin_connection):
id = await admin_connection.execute_command("DFLYCLUSTER MYID") id = await admin_connection.execute_command("DFLYCLUSTER MYID")
return id.decode() assert isinstance(id, str)
return id
@dfly_args({}) @dfly_args({})
@ -168,7 +172,7 @@ Also add keys to each of them that are *not* moved, and see that they are unaffe
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_local_factory): async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes # Start and configure cluster with 2 nodes
nodes = [ nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@ -177,8 +181,8 @@ async def test_cluster_slot_ownership_changes(df_local_factory):
df_local_factory.start_all(nodes) df_local_factory.start_all(nodes)
c_nodes = [aioredis.Redis(port=node.port) for node in nodes] c_nodes = [node.client() for node in nodes]
c_nodes_admin = [aioredis.Redis(port=node.admin_port) for node in nodes] c_nodes_admin = [node.admin_client() for node in nodes]
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
@ -233,7 +237,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory):
assert await c_nodes[0].execute_command("DBSIZE") == 2 assert await c_nodes[0].execute_command("DBSIZE") == 2
# Make sure that node0 owns "KEY0" # Make sure that node0 owns "KEY0"
assert (await c_nodes[0].get("KEY0")).decode() == "value" assert (await c_nodes[0].get("KEY0")) == "value"
# Make sure that "KEY1" is not owned by node1 # Make sure that "KEY1" is not owned by node1
try: try:
@ -255,7 +259,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory):
# node0 should have removed "KEY1" as it no longer owns it # node0 should have removed "KEY1" as it no longer owns it
assert await c_nodes[0].execute_command("DBSIZE") == 1 assert await c_nodes[0].execute_command("DBSIZE") == 1
# node0 should still own "KEY0" though # node0 should still own "KEY0" though
assert (await c_nodes[0].get("KEY0")).decode() == "value" assert (await c_nodes[0].get("KEY0")) == "value"
# node1 should still have "KEY2" # node1 should still have "KEY2"
assert await c_nodes[1].execute_command("DBSIZE") == 1 assert await c_nodes[1].execute_command("DBSIZE") == 1
@ -291,7 +295,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory):
await push_config(config, c_nodes_admin) await push_config(config, c_nodes_admin)
assert await c_nodes[0].execute_command("DBSIZE") == 1 assert await c_nodes[0].execute_command("DBSIZE") == 1
assert (await c_nodes[0].get("KEY0")).decode() == "value" assert (await c_nodes[0].get("KEY0")) == "value"
assert await c_nodes[1].execute_command("DBSIZE") == 0 assert await c_nodes[1].execute_command("DBSIZE") == 0
@ -337,7 +341,7 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
await c_master.set("key", "value") await c_master.set("key", "value")
await c_replica.execute_command("REPLICAOF", "localhost", master.port) await c_replica.execute_command("REPLICAOF", "localhost", master.port)
await check_all_replicas_finished([c_replica], c_master) await check_all_replicas_finished([c_replica], c_master)
assert (await c_replica.get("key")).decode() == "value" assert (await c_replica.get("key")) == "value"
assert await c_replica.execute_command("dbsize") == 1 assert await c_replica.execute_command("dbsize") == 1
# Tell the replica that it and the master no longer own any data, but don't tell that to the # Tell the replica that it and the master no longer own any data, but don't tell that to the
@ -405,18 +409,18 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_local_factory): async def test_cluster_flush_slots_after_config_change(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots # Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001) replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_master_admin = aioredis.Redis(port=master.admin_port) c_master_admin = master.admin_client()
master_id = await get_node_id(c_master_admin) master_id = await get_node_id(c_master_admin)
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
c_replica_admin = aioredis.Redis(port=replica.admin_port) c_replica_admin = replica.admin_client()
replica_id = await get_node_id(c_replica_admin) replica_id = await get_node_id(c_replica_admin)
config = f""" config = f"""
@ -512,7 +516,7 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(df_local_factory): async def test_cluster_native_client(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 3 masters and 3 replicas # Start and configure cluster with 3 masters and 3 replicas
masters = [ masters = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@ -520,7 +524,7 @@ async def test_cluster_native_client(df_local_factory):
] ]
df_local_factory.start_all(masters) df_local_factory.start_all(masters)
c_masters = [aioredis.Redis(port=master.port) for master in masters] c_masters = [aioredis.Redis(port=master.port) for master in masters]
c_masters_admin = [aioredis.Redis(port=master.admin_port) for master in masters] c_masters_admin = [master.admin_client() for master in masters]
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))
replicas = [ replicas = [
@ -528,8 +532,8 @@ async def test_cluster_native_client(df_local_factory):
for i in range(3) for i in range(3)
] ]
df_local_factory.start_all(replicas) df_local_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] c_replicas = [replica.client() for replica in replicas]
c_replicas_admin = [aioredis.Redis(port=replica.admin_port) for replica in replicas] c_replicas_admin = [replica.admin_client() for replica in replicas]
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin)) replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin))
config = f""" config = f"""

View file

@ -9,7 +9,7 @@ async def test_maxclients(df_factory):
# Needs some authentication # Needs some authentication
with df_factory.create(port=1111, maxclients=1, admin_port=1112) as server: with df_factory.create(port=1111, maxclients=1, admin_port=1112) as server:
async with server.client() as client1: async with server.client() as client1:
assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients") assert ["maxclients", "1"] == await client1.execute_command("CONFIG GET maxclients")
with pytest.raises(redis.exceptions.ConnectionError): with pytest.raises(redis.exceptions.ConnectionError):
async with server.client() as client2: async with server.client() as client2:
@ -20,6 +20,6 @@ async def test_maxclients(df_factory):
await admin_client.get("test") await admin_client.get("test")
await client1.execute_command("CONFIG SET maxclients 3") await client1.execute_command("CONFIG SET maxclients 3")
assert [b"maxclients", b"3"] == await client1.execute_command("CONFIG GET maxclients") assert ["maxclients", "3"] == await client1.execute_command("CONFIG GET maxclients")
async with server.client() as client2: async with server.client() as client2:
await client2.get("test") await client2.get("test")

View file

@ -91,10 +91,16 @@ class DflyInstance:
assert self.proc == None assert self.proc == None
def client(self, *args, **kwargs) -> RedisClient: def client(self, *args, **kwargs) -> RedisClient:
return RedisClient(port=self.port, *args, **kwargs) return RedisClient(port=self.port, decode_responses=True, *args, **kwargs)
def admin_client(self, *args, **kwargs) -> RedisClient: def admin_client(self, *args, **kwargs) -> RedisClient:
return RedisClient(port=self.admin_port, *args, **kwargs) return RedisClient(
port=self.admin_port,
single_connection_client=True,
decode_responses=True,
*args,
**kwargs,
)
def __enter__(self): def __enter__(self):
self.start() self.start()

View file

@ -3,10 +3,9 @@ from itertools import chain, repeat
import re import re
import pytest import pytest
import asyncio import asyncio
import redis
from redis import asyncio as aioredis from redis import asyncio as aioredis
from .utility import * from .utility import *
from .instance import DflyInstanceFactory from .instance import DflyInstanceFactory, DflyInstance
from . import dfly_args from . import dfly_args
import pymemcache import pymemcache
import logging import logging
@ -42,7 +41,12 @@ replication_cases = [
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.parametrize("t_master, t_replicas, seeder_config, from_admin_port", replication_cases) @pytest.mark.parametrize("t_master, t_replicas, seeder_config, from_admin_port", replication_cases)
async def test_replication_all( async def test_replication_all(
df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config, from_admin_port df_local_factory: DflyInstanceFactory,
df_seeder_factory,
t_master,
t_replicas,
seeder_config,
from_admin_port,
): ):
master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master) master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master)
replicas = [ replicas = [
@ -94,19 +98,19 @@ async def test_replication_all(
await disconnect_clients(c_master, *c_replicas) await disconnect_clients(c_master, *c_replicas)
async def check_replica_finished_exec(c_replica, c_master): async def check_replica_finished_exec(c_replica: aioredis.Redis, c_master: aioredis.Redis):
role = await c_replica.execute_command("role") role = await c_replica.role()
if role[0] != b"replica" or role[3] != b"stable_sync": if role[0] != "replica" or role[3] != "stable_sync":
return False return False
syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET") syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET")
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET") m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
print(" offset", syncid.decode(), r_offset, m_offset) logging.debug(f" offset {syncid} {r_offset} {m_offset}")
return r_offset == m_offset return r_offset == m_offset
async def check_all_replicas_finished(c_replicas, c_master, timeout=20): async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
print("Waiting for replicas to finish") logging.debug("Waiting for replicas to finish")
waiting_for = list(c_replicas) waiting_for = list(c_replicas)
start = time.time() start = time.time()
@ -120,6 +124,8 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
# Remove clients that finished from waiting list # Remove clients that finished from waiting list
waiting_for = [c for (c, finished) in zip(waiting_for, finished_list) if not finished] waiting_for = [c for (c, finished) in zip(waiting_for, finished_list) if not finished]
first_r: aioredis.Redis = waiting_for[0]
logging.error("Replica not finished, role %s", await first_r.role())
raise RuntimeError("Not all replicas finished in time!") raise RuntimeError("Not all replicas finished in time!")
@ -186,14 +192,12 @@ async def test_disconnect_replica(
# Start master # Start master
master.start() master.start()
c_master = aioredis.Redis(port=master.port, single_connection_client=True) c_master = master.client(single_connection_client=True)
# Start replicas and create clients # Start replicas and create clients
df_local_factory.start_all([replica for replica, _ in replicas]) df_local_factory.start_all([replica for replica, _ in replicas])
c_replicas = [ c_replicas = [(replica, replica.client(), crash_type) for replica, crash_type in replicas]
(replica, aioredis.Redis(port=replica.port), crash_type) for replica, crash_type in replicas
]
def replicas_of_type(tfunc): def replicas_of_type(tfunc):
return [args for args in c_replicas if tfunc(args[2])] return [args for args in c_replicas if tfunc(args[2])]
@ -203,15 +207,16 @@ async def test_disconnect_replica(
fill_task = asyncio.create_task(seeder.run()) fill_task = asyncio.create_task(seeder.run())
# Run full sync # Run full sync
async def full_sync(replica, c_replica, crash_type): async def full_sync(replica: DflyInstance, c_replica, crash_type):
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client(single_connection_client=True)
await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
if crash_type == 0: if crash_type == 0:
await asyncio.sleep(random.random() / 100 + 0.01) await asyncio.sleep(random.random() / 100 + 0.01)
await c_replica.connection_pool.disconnect() await c_replica.close()
replica.stop(kill=True) replica.stop(kill=True)
else: else:
await wait_available_async(c_replica) await wait_available_async(c_replica)
await c_replica.close()
await asyncio.gather(*(full_sync(*args) for args in c_replicas)) await asyncio.gather(*(full_sync(*args) for args in c_replicas))
@ -308,7 +313,7 @@ async def test_disconnect_master(
replicas = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)] replicas = [df_local_factory.create(proactor_threads=t) for i, t in enumerate(t_replicas)]
df_local_factory.start_all(replicas) df_local_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] c_replicas = [replica.client() for replica in replicas]
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2) seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)
@ -319,7 +324,7 @@ async def test_disconnect_master(
async def start_master(): async def start_master():
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
master.start() master.start()
async with aioredis.Redis(port=master.port) as c_master: async with master.client() as c_master:
assert await c_master.ping() assert await c_master.ping()
seeder.reset() seeder.reset()
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
@ -380,7 +385,7 @@ async def test_rotating_masters(
seeders = [df_seeder_factory.create(port=m.port, **seeder_config) for m in masters] seeders = [df_seeder_factory.create(port=m.port, **seeder_config) for m in masters]
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await asyncio.gather(*(seeder.run(target_deviation=0.1) for seeder in seeders)) await asyncio.gather(*(seeder.run(target_deviation=0.1) for seeder in seeders))
@ -425,7 +430,7 @@ async def test_cancel_replication_immediately(
df_local_factory.start_all([replica, master]) df_local_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port) seeder = df_seeder_factory.create(port=master.port)
c_replica = aioredis.Redis(port=replica.port, socket_timeout=20) c_replica = replica.client(socket_timeout=20)
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
@ -476,7 +481,7 @@ async def test_flushall(df_local_factory):
replica.start() replica.start()
# Connect replica to master # Connect replica to master
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
n_keys = 1000 n_keys = 1000
@ -485,7 +490,7 @@ async def test_flushall(df_local_factory):
for i in range(start, end): for i in range(start, end):
yield f"key-{i}", f"value-{i}" yield f"key-{i}", f"value-{i}"
c_master = aioredis.Redis(port=master.port) c_master = master.client()
pipe = c_master.pipeline(transaction=False) pipe = c_master.pipeline(transaction=False)
# Set simple keys 0..n_keys on master # Set simple keys 0..n_keys on master
batch_fill_data(client=pipe, gen=gen_test_data(0, n_keys), batch_size=3) batch_fill_data(client=pipe, gen=gen_test_data(0, n_keys), batch_size=3)
@ -530,8 +535,8 @@ async def test_rewrites(df_local_factory):
replica.start() replica.start()
# Connect clients, connect replica to master # Connect clients, connect replica to master
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica) await wait_available_async(c_replica)
@ -697,12 +702,12 @@ Test automatic replication of expiry.
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_expiry(df_local_factory, n_keys=1000): async def test_expiry(df_local_factory, n_keys=1000):
master = df_local_factory.create() master = df_local_factory.create()
replica = df_local_factory.create(logtostdout=True) replica = df_local_factory.create()
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
# Connect replica to master # Connect replica to master
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
@ -810,8 +815,8 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys
df_local_factory.start_all([master] + replicas) df_local_factory.start_all([master] + replicas)
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] c_replicas = [replica.client() for replica in replicas]
for c_replica in c_replicas: for c_replica in c_replicas:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica) await wait_available_async(c_replica)
@ -824,7 +829,7 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys
rsps = await asyncio.gather( rsps = await asyncio.gather(
*(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets) *(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets)
) )
assert rsps == [b"OK"] * num_par assert rsps == ["OK"] * num_par
await check_all_replicas_finished(c_replicas, c_master) await check_all_replicas_finished(c_replicas, c_master)
@ -847,8 +852,8 @@ async def test_auth_master(df_local_factory, n_keys=20):
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port, password=masterpass) c_master = master.client(password=masterpass)
c_replica = aioredis.Redis(port=replica.port, password=replicapass) c_replica = replica.client(password=replicapass)
# Connect replica to master # Connect replica to master
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
@ -877,8 +882,8 @@ async def test_script_transfer(df_local_factory):
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
# Load some scripts into master ahead # Load some scripts into master ahead
scripts = [] scripts = []
@ -907,28 +912,30 @@ async def test_script_transfer(df_local_factory):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_role_command(df_local_factory, n_keys=20): async def test_role_command(df_local_factory, n_keys=20):
master = df_local_factory.create() master = df_local_factory.create()
replica = df_local_factory.create(logtostdout=True) replica = df_local_factory.create()
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
assert await c_master.execute_command("role") == [b"master", []] assert await c_master.execute_command("role") == ["master", []]
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica) await wait_available_async(c_replica)
# It may take a bit more time to actually propagate the role change
# See https://github.com/dragonflydb/dragonfly/pull/2111
await asyncio.sleep(1) await asyncio.sleep(1)
assert await c_master.execute_command("role") == [ assert await c_master.execute_command("role") == [
b"master", "master",
[[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], [["127.0.0.1", str(replica.port), "stable_sync"]],
] ]
assert await c_replica.execute_command("role") == [ assert await c_replica.execute_command("role") == [
b"replica", "replica",
b"localhost", "localhost",
bytes(str(master.port), "ascii"), str(master.port),
b"stable_sync", "stable_sync",
] ]
# This tests that we react fast to socket shutdowns and don't hang on # This tests that we react fast to socket shutdowns and don't hang on
@ -936,18 +943,18 @@ async def test_role_command(df_local_factory, n_keys=20):
master.stop() master.stop()
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert await c_replica.execute_command("role") == [ assert await c_replica.execute_command("role") == [
b"replica", "replica",
b"localhost", "localhost",
bytes(str(master.port), "ascii"), str(master.port),
b"connecting", "connecting",
] ]
await c_master.connection_pool.disconnect() await c_master.connection_pool.disconnect()
await c_replica.connection_pool.disconnect() await c_replica.connection_pool.disconnect()
def parse_lag(replication_info: bytes): def parse_lag(replication_info: str):
lags = re.findall(b"lag=([0-9]+)\r\n", replication_info) lags = re.findall("lag=([0-9]+)\r\n", replication_info)
assert len(lags) == 1 assert len(lags) == 1
return int(lags[0]) return int(lags[0])
@ -984,8 +991,8 @@ async def test_replication_info(df_local_factory, df_seeder_factory, n_keys=2000
master = df_local_factory.create() master = df_local_factory.create()
replica = df_local_factory.create(logtostdout=True, replication_acks_interval=100) replica = df_local_factory.create(logtostdout=True, replication_acks_interval=100)
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica) await wait_available_async(c_replica)
@ -1019,7 +1026,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory):
# Start master # Start master
master.start() master.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
# Fill master with test data # Fill master with test data
seeder = df_seeder_factory.create(port=master.port, keys=100_000, dbcount=1) seeder = df_seeder_factory.create(port=master.port, keys=100_000, dbcount=1)
@ -1027,7 +1034,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory):
# Start replica # Start replica
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
async def get_sync_mode(c_master): async def get_sync_mode(c_master):
@ -1036,7 +1043,7 @@ async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory):
return result[1][0][2] return result[1][0][2]
async def is_full_sync_mode(c_master): async def is_full_sync_mode(c_master):
return await get_sync_mode(c_master) == b"full_sync" return await get_sync_mode(c_master) == "full_sync"
# Wait for full sync to start # Wait for full sync to start
while not await is_full_sync_mode(c_master): while not await is_full_sync_mode(c_master):
@ -1088,8 +1095,8 @@ async def test_readonly_script(df_local_factory):
df_local_factory.start_all([master, replica]) df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_master.set("WORKS", "YES") await c_master.set("WORKS", "YES")
@ -1165,7 +1172,7 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre
_, _, *results = await asyncio.gather( _, _, *results = await asyncio.gather(
delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)] delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)]
) )
assert await c1.execute_command("role") == [b"master", []] assert await c1.execute_command("role") == ["master", []]
for key, client_value in results: for key, client_value in results:
replicated_value = await c1.get(key) replicated_value = await c1.get(key)
@ -1205,7 +1212,7 @@ async def test_take_over_seeder(
await c_replica.execute_command(f"REPLTAKEOVER 5") await c_replica.execute_command(f"REPLTAKEOVER 5")
seeder.stop() seeder.stop()
assert await c_replica.execute_command("role") == [b"master", []] assert await c_replica.execute_command("role") == ["master", []]
# Need to wait a bit to give time to write the shutdown snapshot # Need to wait a bit to give time to write the shutdown snapshot
await asyncio.sleep(1) await asyncio.sleep(1)
@ -1251,14 +1258,14 @@ async def test_take_over_timeout(df_local_factory, df_seeder_factory):
await fill_task await fill_task
assert await c_master.execute_command("role") == [ assert await c_master.execute_command("role") == [
b"master", "master",
[[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], [["127.0.0.1", str(replica.port), "stable_sync"]],
] ]
assert await c_replica.execute_command("role") == [ assert await c_replica.execute_command("role") == [
b"replica", "replica",
b"localhost", "localhost",
bytes(str(master.port), "ascii"), str(master.port),
b"stable_sync", "stable_sync",
] ]
await disconnect_clients(c_master, c_replica) await disconnect_clients(c_master, c_replica)
@ -1272,7 +1279,11 @@ replication_cases = [(8, 8)]
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replica", replication_cases) @pytest.mark.parametrize("t_master, t_replica", replication_cases)
async def test_no_tls_on_admin_port( async def test_no_tls_on_admin_port(
df_local_factory, df_seeder_factory, t_master, t_replica, with_tls_server_args df_local_factory: DflyInstanceFactory,
df_seeder_factory,
t_master,
t_replica,
with_tls_server_args,
): ):
# 1. Spin up dragonfly without tls, debug populate # 1. Spin up dragonfly without tls, debug populate
master = df_local_factory.create( master = df_local_factory.create(
@ -1283,7 +1294,7 @@ async def test_no_tls_on_admin_port(
proactor_threads=t_master, proactor_threads=t_master,
) )
master.start() master.start()
c_master = aioredis.Redis(port=master.admin_port, password="XXX") c_master = master.admin_client(password="XXX")
await c_master.execute_command("DEBUG POPULATE 100") await c_master.execute_command("DEBUG POPULATE 100")
db_size = await c_master.execute_command("DBSIZE") db_size = await c_master.execute_command("DBSIZE")
assert 100 == db_size assert 100 == db_size
@ -1298,9 +1309,9 @@ async def test_no_tls_on_admin_port(
masterauth="XXX", masterauth="XXX",
) )
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.admin_port, password="XXX") c_replica = replica.admin_client(password="XXX")
res = await c_replica.execute_command("REPLICAOF localhost " + str(master.admin_port)) res = await c_replica.execute_command("REPLICAOF localhost " + str(master.admin_port))
assert b"OK" == res assert "OK" == res
await check_all_replicas_finished([c_replica], c_master) await check_all_replicas_finished([c_replica], c_master)
# 3. Verify that replica dbsize == debug populate key size -- replication works # 3. Verify that replica dbsize == debug populate key size -- replication works
@ -1336,7 +1347,7 @@ async def test_tls_replication(
proactor_threads=t_master, proactor_threads=t_master,
) )
master.start() master.start()
c_master = aioredis.Redis(port=master.port, **with_ca_tls_client_args) c_master = master.client(**with_ca_tls_client_args)
await c_master.execute_command("DEBUG POPULATE 100") await c_master.execute_command("DEBUG POPULATE 100")
db_size = await c_master.execute_command("DBSIZE") db_size = await c_master.execute_command("DBSIZE")
assert 100 == db_size assert 100 == db_size
@ -1348,10 +1359,10 @@ async def test_tls_replication(
proactor_threads=t_replica, proactor_threads=t_replica,
) )
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.port, **with_ca_tls_client_args) c_replica = replica.client(**with_ca_tls_client_args)
port = master.port if not test_admin_port else master.admin_port port = master.port if not test_admin_port else master.admin_port
res = await c_replica.execute_command("REPLICAOF localhost " + str(port)) res = await c_replica.execute_command("REPLICAOF localhost " + str(port))
assert b"OK" == res assert "OK" == res
await check_all_replicas_finished([c_replica], c_master) await check_all_replicas_finished([c_replica], c_master)
# 3. Verify that replica dbsize == debug populate key size -- replication works # 3. Verify that replica dbsize == debug populate key size -- replication works
@ -1362,7 +1373,7 @@ async def test_tls_replication(
master.stop(kill=True) master.stop(kill=True)
await asyncio.sleep(3) await asyncio.sleep(3)
master.start() master.start()
c_master = aioredis.Redis(port=master.port, **with_ca_tls_client_args) c_master = master.client(**with_ca_tls_client_args)
# Master doesn't load the snapshot, therefore dbsize should be 0 # Master doesn't load the snapshot, therefore dbsize should be 0
await c_master.execute_command("SET MY_KEY 1") await c_master.execute_command("SET MY_KEY 1")
db_size = await c_master.execute_command("DBSIZE") db_size = await c_master.execute_command("DBSIZE")
@ -1399,8 +1410,8 @@ async def test_replicaof_flag(df_local_factory):
# set up master # set up master
master.start() master.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
await c_master.set("KEY", b"VALUE") await c_master.set("KEY", "VALUE")
db_size = await c_master.dbsize() db_size = await c_master.dbsize()
assert 1 == db_size assert 1 == db_size
@ -1411,7 +1422,7 @@ async def test_replicaof_flag(df_local_factory):
# set up replica. check that it is replicating # set up replica. check that it is replicating
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await wait_available_async(c_replica) # give it time to startup await wait_available_async(c_replica) # give it time to startup
# wait until we have a connection # wait until we have a connection
@ -1422,7 +1433,7 @@ async def test_replicaof_flag(df_local_factory):
assert 1 == dbsize assert 1 == dbsize
val = await c_replica.get("KEY") val = await c_replica.get("KEY")
assert b"VALUE" == val assert "VALUE" == val
@pytest.mark.asyncio @pytest.mark.asyncio
@ -1436,7 +1447,7 @@ async def test_replicaof_flag_replication_waits(df_local_factory):
# set up replica first # set up replica first
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await wait_for_replica_status(c_replica, status="down") await wait_for_replica_status(c_replica, status="down")
# check that it is in replica mode, yet status is down # check that it is in replica mode, yet status is down
@ -1453,8 +1464,8 @@ async def test_replicaof_flag_replication_waits(df_local_factory):
) )
master.start() master.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
await c_master.set("KEY", b"VALUE") await c_master.set("KEY", "VALUE")
db_size = await c_master.dbsize() db_size = await c_master.dbsize()
assert 1 == db_size assert 1 == db_size
@ -1466,7 +1477,7 @@ async def test_replicaof_flag_replication_waits(df_local_factory):
assert 1 == dbsize assert 1 == dbsize
val = await c_replica.get("KEY") val = await c_replica.get("KEY")
assert b"VALUE" == val assert "VALUE" == val
@pytest.mark.asyncio @pytest.mark.asyncio
@ -1478,10 +1489,10 @@ async def test_replicaof_flag_disconnect(df_local_factory):
# set up master # set up master
master.start() master.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
await wait_available_async(c_master) await wait_available_async(c_master)
await c_master.set("KEY", b"VALUE") await c_master.set("KEY", "VALUE")
db_size = await c_master.dbsize() db_size = await c_master.dbsize()
assert 1 == db_size assert 1 == db_size
@ -1493,7 +1504,7 @@ async def test_replicaof_flag_disconnect(df_local_factory):
# set up replica. check that it is replicating # set up replica. check that it is replicating
replica.start() replica.start()
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await wait_available_async(c_replica) await wait_available_async(c_replica)
await wait_for_replica_status(c_replica, status="up") await wait_for_replica_status(c_replica, status="up")
await check_all_replicas_finished([c_replica], c_master) await check_all_replicas_finished([c_replica], c_master)
@ -1502,12 +1513,12 @@ async def test_replicaof_flag_disconnect(df_local_factory):
assert 1 == dbsize assert 1 == dbsize
val = await c_replica.get("KEY") val = await c_replica.get("KEY")
assert b"VALUE" == val assert "VALUE" == val
await c_replica.replicaof("no", "one") # disconnect await c_replica.replicaof("no", "one") # disconnect
role = await c_replica.role() role = await c_replica.role()
assert role[0] == b"master" assert role[0] == "master"
@pytest.mark.asyncio @pytest.mark.asyncio
@ -1525,10 +1536,10 @@ async def test_df_crash_on_memcached_error(df_local_factory):
master.start() master.start()
replica.start() replica.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
await wait_available_async(c_master) await wait_available_async(c_master)
c_replica = aioredis.Redis(port=replica.port) c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica) await wait_available_async(c_replica)
await wait_for_replica_status(c_replica, status="up") await wait_for_replica_status(c_replica, status="up")
@ -1537,7 +1548,7 @@ async def test_df_crash_on_memcached_error(df_local_factory):
memcached_client = pymemcache.Client(f"localhost:{replica.mc_port}") memcached_client = pymemcache.Client(f"localhost:{replica.mc_port}")
with pytest.raises(pymemcache.exceptions.MemcacheServerError): with pytest.raises(pymemcache.exceptions.MemcacheServerError):
memcached_client.set(b"key", b"data", noreply=False) memcached_client.set("key", "data", noreply=False)
await c_master.close() await c_master.close()
@ -1552,8 +1563,8 @@ async def test_df_crash_on_replicaof_flag(df_local_factory):
replica = df_local_factory.create(proactor_threads=2, replicaof=f"127.0.0.1:{master.port}") replica = df_local_factory.create(proactor_threads=2, replicaof=f"127.0.0.1:{master.port}")
replica.start() replica.start()
c_master = aioredis.Redis(port=master.port) c_master = master.client()
c_replica = aioredis.Redis(port=replica.port, decode_responses=True) c_replica = replica.client()
await wait_available_async(c_master) await wait_available_async(c_master)
await wait_available_async(c_replica) await wait_available_async(c_replica)

View file

@ -148,7 +148,7 @@ class TestDflyAutoLoadSnapshot(SnapshotTestBase):
async with df_server.client() as client: async with df_server.client() as client:
await wait_available_async(client) await wait_available_async(client)
response = await client.get("TEST") response = await client.get("TEST")
assert response.decode("utf-8") == str(hash(dbfilename)) assert response == str(hash(dbfilename))
@dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "save_schedule": "*:*"}) @dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "save_schedule": "*:*"})