mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): active memory defrag test application (#552)
Signed-off-by: Boaz Sade <boaz@dragonflydb.io> Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
This commit is contained in:
parent
444f7e3f03
commit
dba4c6bd54
2 changed files with 179 additions and 0 deletions
172
tools/defrag_mem_test.py
Executable file
172
tools/defrag_mem_test.py
Executable file
|
@ -0,0 +1,172 @@
|
|||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import aioredis
|
||||
import async_timeout
|
||||
import sys
|
||||
import argparse
|
||||
'''
|
||||
To install: pip install -r requirements.txt
|
||||
|
||||
This program would try to re-create the issue with memory defragmentation.
|
||||
See issue number 448 for more details.
|
||||
To run this:
|
||||
You can just execute this from the command line without any arguemnts.
|
||||
Or you can run with --help to see the options.
|
||||
The defaults are:
|
||||
number of keys: 4,023,467
|
||||
value size: 64 bytes
|
||||
key name pattern: key-for-testing
|
||||
host: localhost
|
||||
port: default redis port
|
||||
Please note that this would create 4 * number of keys entries
|
||||
You can see the memory usage/defrag state with the monitoring task that
|
||||
prints the current state
|
||||
|
||||
NOTE:
|
||||
If this seems to get stuck please kill it with ctrl+c
|
||||
This can happen in case we don't have "defrag_realloc_total > 0"
|
||||
'''
|
||||
|
||||
class TaskCancel:
|
||||
def __init__(self):
|
||||
self.run = True
|
||||
|
||||
def dont_stop(self):
|
||||
return self.run
|
||||
|
||||
def stop(self):
|
||||
self.run = False
|
||||
|
||||
async def run_cmd(connection, cmd, sub_val):
|
||||
val = await connection.execute_command(cmd, sub_val)
|
||||
return val
|
||||
|
||||
async def handle_defrag_stats(connection, prev):
|
||||
info = await run_cmd(connection, "info", "stats")
|
||||
if info is not None:
|
||||
if info['defrag_task_invocation_total'] != prev:
|
||||
print("--------------------------------------------------------------")
|
||||
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
|
||||
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
|
||||
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
|
||||
print("--------------------------------------------------------------")
|
||||
if info["defrag_realloc_total"] > 0:
|
||||
return True, None
|
||||
return False, info['defrag_task_invocation_total']
|
||||
return False, None
|
||||
|
||||
async def memory_stats(connection):
|
||||
print("--------------------------------------------------------------")
|
||||
info = await run_cmd(connection, "info", "memory")
|
||||
print(f"memory commited: {info['comitted_memory']:,}")
|
||||
print(f"memory used: {info['used_memory']:,}")
|
||||
print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
|
||||
print("--------------------------------------------------------------")
|
||||
|
||||
|
||||
async def stats_check(connection, condition):
|
||||
try:
|
||||
defrag_task_invocation_total = 0;
|
||||
runs=0
|
||||
while condition.dont_stop():
|
||||
await asyncio.sleep(0.3)
|
||||
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
|
||||
if done:
|
||||
print("defrag task successfully found memory locations to reallocate")
|
||||
condition.stop()
|
||||
else:
|
||||
if d is not None:
|
||||
defrag_task_invocation_total = d
|
||||
runs += 1
|
||||
if runs % 3 == 0:
|
||||
await memory_stats(connection)
|
||||
for i in range(5):
|
||||
done, d = await handle_defrag_stats(connection, -1)
|
||||
if done:
|
||||
print("defrag task successfully found memory locations to reallocate")
|
||||
return True
|
||||
else:
|
||||
await asyncio.sleep(2)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"failed to run monitor task: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def delete_keys(connection, keys):
|
||||
results = await connection.delete(*keys)
|
||||
return results
|
||||
|
||||
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
|
||||
for i in range(1, count, batch_size):
|
||||
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
|
||||
yield batch
|
||||
|
||||
async def mem_cleanup(connection, pattern, num, cond, keys_count):
|
||||
counter=0
|
||||
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
|
||||
if cond.dont_stop() == False:
|
||||
print(f"task number {num} that deleted keys {pattern} finished")
|
||||
return counter
|
||||
counter += await delete_keys(connection, keys)
|
||||
await asyncio.sleep(0.2)
|
||||
print(f"task number {num} that deleted keys {pattern} finished")
|
||||
return counter
|
||||
|
||||
|
||||
async def run_tasks(pool, key_name, value_size, keys_count):
|
||||
keys = [f"{key_name}-{i}" for i in range(4)]
|
||||
stop_cond = TaskCancel()
|
||||
try:
|
||||
connection = aioredis.Redis(connection_pool=pool)
|
||||
for key in keys:
|
||||
print(f"creating key {key} with size {value_size} of count {keys_count}")
|
||||
await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
|
||||
await asyncio.sleep(2)
|
||||
tasks = []
|
||||
count = 0
|
||||
for key in keys:
|
||||
pattern=f"{key}:"
|
||||
print(f"deleting keys from {pattern}")
|
||||
tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count)))
|
||||
count += 1
|
||||
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
|
||||
total = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
print(f"successfully deleted {sum(total)} keys")
|
||||
stop_cond.stop()
|
||||
await monitor_task
|
||||
print("finish executing")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"got error {e} while running delete keys")
|
||||
return False
|
||||
|
||||
|
||||
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
|
||||
async_pool = aioredis.ConnectionPool(host=host, port=port,
|
||||
db=0, decode_responses=True, max_connections=16)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count))
|
||||
return success
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('-k', '--keys', type=int, default=4023467, help='total number of keys')
|
||||
parser.add_argument('-v', '--value_size', type=int, default=64, help='size of the values')
|
||||
parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name')
|
||||
parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name')
|
||||
parser.add_argument('-p', '--port', type=int, default=6379, help='server port number')
|
||||
args = parser.parse_args()
|
||||
keys_num = args.keys
|
||||
key_name = args.key_name
|
||||
value_size = args.value_size
|
||||
host = args.server
|
||||
port = args.port
|
||||
print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}")
|
||||
result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port)
|
||||
if result == True:
|
||||
print("finished successfully")
|
||||
else:
|
||||
print("failed")
|
7
tools/requirements.txt
Normal file
7
tools/requirements.txt
Normal file
|
@ -0,0 +1,7 @@
|
|||
aioredis==2.0.1
|
||||
async_timeout==4.0.2
|
||||
pytoml==0.1.21
|
||||
PyYAML==6.0
|
||||
railroad==0.5.0
|
||||
redis==4.3.4
|
||||
requests==2.28.1
|
Loading…
Reference in a new issue