mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
7944af3c62
Add black formatter and run it on pytests
60 lines
1.9 KiB
Python
Executable file
60 lines
1.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
"""
|
|
This is the script that helped to reproduce https://github.com/dragonflydb/dragonfly/issues/150
|
|
The outcome - stalled code with all its connections deadlocked.
|
|
Reproduced only with dragonfly in release mode on multi-core machine.
|
|
"""
|
|
|
|
import asyncio
|
|
import aioredis
|
|
|
|
from loguru import logger as log
|
|
import sys
|
|
import random
|
|
|
|
connection_pool = aioredis.ConnectionPool(
|
|
host="localhost", port=6379, db=1, decode_responses=True, max_connections=16
|
|
)
|
|
|
|
|
|
key_index = 1
|
|
|
|
|
|
async def post_to_redis(sem, db_name, index):
|
|
global key_index
|
|
async with sem:
|
|
results = None
|
|
try:
|
|
redis_client = aioredis.Redis(connection_pool=connection_pool)
|
|
async with redis_client.pipeline(transaction=True) as pipe:
|
|
for i in range(1, 15):
|
|
pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla")
|
|
key_index += 1
|
|
# log.info(f"after first half {key_index}")
|
|
for i in range(1, 15):
|
|
pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla")
|
|
key_index += 1
|
|
assert len(pipe.command_stack) > 0
|
|
log.info(f"before pipe.execute {key_index}")
|
|
results = await pipe.execute()
|
|
log.info(f"after pipe.execute {key_index}")
|
|
finally:
|
|
# log.info(f"before close {index}")
|
|
await redis_client.close()
|
|
# log.info(f"after close {index} {len(results)}")
|
|
|
|
|
|
async def do_concurrent(db_name):
|
|
tasks = []
|
|
sem = asyncio.Semaphore(10)
|
|
for i in range(1, 3000):
|
|
tasks.append(post_to_redis(sem, db_name, i))
|
|
res = await asyncio.gather(*tasks)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log.remove()
|
|
log.add(sys.stdout, enqueue=True, level="INFO")
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(do_concurrent("my_db"))
|