1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

Fix regression test failures on old Python versions (#1521)

Signed-off-by: adiholden <adi@dragonflydb.io>
Co-authored-by: adiholden <adi@dragonflydb.io>
This commit is contained in:
Roy Jacobson 2023-07-06 14:27:39 +02:00 committed by GitHub
parent 15481b81ce
commit 3904a4f628
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 87 deletions

View file

@ -1093,50 +1093,51 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre
replica3 = df_local_factory.create(
port=BASE_PORT+3, proactor_threads=replica_threads)
df_local_factory.start_all([master, replica1, replica2, replica3])
async with (
master.client() as c_master,
replica1.client() as c1,
master.client() as c_blocking,
replica2.client() as c2,
replica3.client() as c3,
):
await c1.execute_command(f"REPLICAOF localhost {master.port}")
await c2.execute_command(f"REPLICAOF localhost {master.port}")
await c3.execute_command(f"REPLICAOF localhost {master.port}")
c_master = master.client()
c1 = replica1.client()
c_blocking = master.client()
c2 = replica2.client()
c3 = replica3.client()
await wait_available_async(c1)
await c1.execute_command(f"REPLICAOF localhost {master.port}")
await c2.execute_command(f"REPLICAOF localhost {master.port}")
await c3.execute_command(f"REPLICAOF localhost {master.port}")
async def counter(key):
value = 0
await c_master.execute_command(f"SET {key} 0")
start = time.time()
while time.time() - start < 20:
try:
value = await c_master.execute_command(f"INCR {key}")
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
break
else:
assert False, "The incrementing loop should be exited with a connection error"
return key, value
await wait_available_async(c1)
async def block_during_takeover():
"Add a blocking command during takeover to make sure it doesn't block it."
start = time.time()
# The command should just be canceled
assert await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 100") is None
# And it should happen in reasonable amount of time.
assert time.time() - start < 10
async def counter(key):
value = 0
await c_master.execute_command(f"SET {key} 0")
start = time.time()
while time.time() - start < 20:
try:
value = await c_master.execute_command(f"INCR {key}")
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
break
else:
assert False, "The incrementing loop should be exited with a connection error"
return key, value
async def delayed_takeover():
await asyncio.sleep(1)
await c1.execute_command(f"REPLTAKEOVER 5")
async def block_during_takeover():
"Add a blocking command during takeover to make sure it doesn't block it."
start = time.time()
# The command should just be canceled
assert await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 100") is None
# And it should happen in reasonable amount of time.
assert time.time() - start < 10
_, _, *results = await asyncio.gather(delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)])
assert await c1.execute_command("role") == [b'master', []]
async def delayed_takeover():
await asyncio.sleep(1)
await c1.execute_command(f"REPLTAKEOVER 5")
for key, client_value in results:
replicated_value = await c1.get(key)
assert client_value == int(replicated_value)
_, _, *results = await asyncio.gather(delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)])
assert await c1.execute_command("role") == [b'master', []]
for key, client_value in results:
replicated_value = await c1.get(key)
assert client_value == int(replicated_value)
await disconnect_clients(c_master, c1, c_blocking, c2, c3)
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
@ -1150,35 +1151,38 @@ async def test_take_over_seeder(df_local_factory, df_seeder_factory, master_thre
port=BASE_PORT+1, proactor_threads=replica_threads)
df_local_factory.start_all([master, replica])
seeder = df_seeder_factory.create(port=master.port, keys=1000, dbcount=5, stop_on_failure=False)
async with (
master.client() as c_master,
replica.client() as c_replica,
):
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
seeder = df_seeder_factory.create(
port=master.port, keys=1000, dbcount=5, stop_on_failure=False)
async def seed():
await seeder.run(target_ops=3000)
c_master = master.client()
c_replica = replica.client()
fill_task = asyncio.create_task(seed())
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
# Give the seeder a bit of time.
await asyncio.sleep(1)
await c_replica.execute_command(f"REPLTAKEOVER 5")
seeder.stop()
async def seed():
await seeder.run(target_ops=3000)
assert await c_replica.execute_command("role") == [b'master', []]
fill_task = asyncio.create_task(seed())
# Need to wait a bit to give time to write the shutdown snapshot
await asyncio.sleep(1)
assert master.proc.poll() == 0, "Master process did not exit correctly."
# Give the seeder a bit of time.
await asyncio.sleep(1)
await c_replica.execute_command(f"REPLTAKEOVER 5")
seeder.stop()
master.start()
await wait_available_async(c_master)
assert await c_replica.execute_command("role") == [b'master', []]
capture = await seeder.capture()
assert await seeder.compare(capture, port=replica.port)
# Need to wait a bit to give time to write the shutdown snapshot
await asyncio.sleep(1)
assert master.proc.poll() == 0, "Master process did not exit correctly."
master.start()
await wait_available_async(c_master)
capture = await seeder.capture()
assert await seeder.compare(capture, port=replica.port)
await disconnect_clients(c_master, c_replica)
@pytest.mark.asyncio
@ -1192,32 +1196,34 @@ async def test_take_over_timeout(df_local_factory, df_seeder_factory):
seeder = df_seeder_factory.create(
port=master.port, keys=1000, dbcount=5, stop_on_failure=False)
async with (
master.client() as c_master,
replica.client() as c_replica,
):
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
async def seed():
await seeder.run(target_ops=3000)
c_master = master.client()
c_replica = replica.client()
fill_task = asyncio.create_task(seed())
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
# Give the seeder a bit of time.
await asyncio.sleep(1)
try:
await c_replica.execute_command(f"REPLTAKEOVER 0.0001")
except redis.exceptions.ResponseError as e:
assert str(e) == "Couldn't execute takeover"
else:
assert False, "Takeover should not succeed."
seeder.stop()
await fill_task
async def seed():
await seeder.run(target_ops=3000)
assert await c_master.execute_command("role") == [b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]]
assert await c_replica.execute_command("role") == [b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync']
fill_task = asyncio.create_task(seed())
# Give the seeder a bit of time.
await asyncio.sleep(1)
try:
await c_replica.execute_command(f"REPLTAKEOVER 0.0001")
except redis.exceptions.ResponseError as e:
assert str(e) == "Couldn't execute takeover"
else:
assert False, "Takeover should not succeed."
seeder.stop()
await fill_task
assert await c_master.execute_command("role") == [b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]]
assert await c_replica.execute_command("role") == [b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync']
await disconnect_clients(c_master, c_replica)
# 1. Number of master threads
# 2. Number of threads for each replica

View file

@ -65,13 +65,16 @@ async def test_connection_name(async_client: aioredis.Redis):
async def test_client_list(df_factory):
instance = df_factory.create(port=1111, admin_port=1112)
instance.start()
async with (aioredis.Redis(port=instance.port) as client, aioredis.Redis(port=instance.admin_port) as admin_client):
await client.ping()
await admin_client.ping()
assert len(await client.execute_command("CLIENT LIST")) == 2
assert len(await admin_client.execute_command("CLIENT LIST")) == 2
instance.stop()
client = aioredis.Redis(port=instance.port)
admin_client = aioredis.Redis(port=instance.admin_port)
await client.ping()
await admin_client.ping()
assert len(await client.execute_command("CLIENT LIST")) == 2
assert len(await admin_client.execute_command("CLIENT LIST")) == 2
instance.stop()
await disconnect_clients(client, admin_client)
async def test_scan(async_client: aioredis.Redis):
'''

View file

@ -556,3 +556,7 @@ class DflySeederFactory:
def create(self, **kwargs):
return DflySeeder(log_file=self.log_file, **kwargs)
async def disconnect_clients(*clients):
await asyncio.gather(*(c.connection_pool.disconnect() for c in clients))