1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00
dragonflydb-dragonfly/tests/async.py
Roman Gershman 4ec2538204
fix(transaction): Fix rare deadlock case - fixes #150. (#155)
In rare cases a scheduled transaction is not scheduled correctly and we need
to remove it from the tx-queue in order to re-schedule. When we pull it from tx-queue
and it has been located at the head, we must poll-execute the next txs in the queue.

1. Fix the bug.
2. Improve verbosity loggings to make it easier to follow up on tx flow in release mode.
3. Introduce /txz handler that shows currently pending transactions in the queue.
4. Fix a typo in xdel() function.
5. Add a py-script that reproduces the bug.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
2022-06-15 16:53:27 +03:00

58 lines
1.9 KiB
Python
Executable file

#!/usr/bin/env python3
"""
This is the script that helped to reproduce https://github.com/dragonflydb/dragonfly/issues/150
The outcome - stalled code with all its connections deadlocked.
Reproduced only with dragonfly in release mode on multi-core machine.
"""
import asyncio
import aioredis
from loguru import logger as log
import sys
import random
connection_pool = aioredis.ConnectionPool(host="localhost", port=6379,
db=1, decode_responses=True, max_connections=16)
key_index = 1
async def post_to_redis(sem, db_name, index):
global key_index
async with sem:
results = None
try:
redis_client = aioredis.Redis(connection_pool=connection_pool)
async with redis_client.pipeline(transaction=True) as pipe:
for i in range(1, 15):
pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla")
key_index += 1
#log.info(f"after first half {key_index}")
for i in range(1, 15):
pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla")
key_index += 1
assert len(pipe.command_stack) > 0
log.info(f"before pipe.execute {key_index}")
results = await pipe.execute()
log.info(f"after pipe.execute {key_index}")
finally:
# log.info(f"before close {index}")
await redis_client.close()
#log.info(f"after close {index} {len(results)}")
async def do_concurrent(db_name):
tasks = []
sem = asyncio.Semaphore(10)
for i in range(1, 3000):
tasks.append(post_to_redis(sem, db_name, i))
res = await asyncio.gather(*tasks)
if __name__ == '__main__':
log.remove()
log.add(sys.stdout, enqueue=True, level='INFO')
loop = asyncio.get_event_loop()
loop.run_until_complete(do_concurrent("my_db"))