diff --git a/tools/defrag_mem_test.py b/tools/defrag_mem_test.py new file mode 100755 index 000000000..de12a38dc --- /dev/null +++ b/tools/defrag_mem_test.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +import asyncio +import aioredis +import async_timeout +import sys +import argparse +''' +To install: pip install -r requirements.txt + +This program would try to re-create the issue with memory defragmentation. +See issue number 448 for more details. +To run this: + You can just execute this from the command line without any arguemnts. + Or you can run with --help to see the options. + The defaults are: + number of keys: 4,023,467 + value size: 64 bytes + key name pattern: key-for-testing + host: localhost + port: default redis port + Please note that this would create 4 * number of keys entries + You can see the memory usage/defrag state with the monitoring task that + prints the current state + +NOTE: + If this seems to get stuck please kill it with ctrl+c + This can happen in case we don't have "defrag_realloc_total > 0" +''' + +class TaskCancel: + def __init__(self): + self.run = True + + def dont_stop(self): + return self.run + + def stop(self): + self.run = False + +async def run_cmd(connection, cmd, sub_val): + val = await connection.execute_command(cmd, sub_val) + return val + +async def handle_defrag_stats(connection, prev): + info = await run_cmd(connection, "info", "stats") + if info is not None: + if info['defrag_task_invocation_total'] != prev: + print("--------------------------------------------------------------") + print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}") + print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}") + print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}") + print("--------------------------------------------------------------") + if info["defrag_realloc_total"] > 0: + return True, None + return False, info['defrag_task_invocation_total'] + return False, None + +async def memory_stats(connection): + print("--------------------------------------------------------------") + info = await run_cmd(connection, "info", "memory") + print(f"memory commited: {info['comitted_memory']:,}") + print(f"memory used: {info['used_memory']:,}") + print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}") + print("--------------------------------------------------------------") + + +async def stats_check(connection, condition): + try: + defrag_task_invocation_total = 0; + runs=0 + while condition.dont_stop(): + await asyncio.sleep(0.3) + done, d = await handle_defrag_stats(connection, defrag_task_invocation_total) + if done: + print("defrag task successfully found memory locations to reallocate") + condition.stop() + else: + if d is not None: + defrag_task_invocation_total = d + runs += 1 + if runs % 3 == 0: + await memory_stats(connection) + for i in range(5): + done, d = await handle_defrag_stats(connection, -1) + if done: + print("defrag task successfully found memory locations to reallocate") + return True + else: + await asyncio.sleep(2) + return True + except Exception as e: + print(f"failed to run monitor task: {e}") + return False + + +async def delete_keys(connection, keys): + results = await connection.delete(*keys) + return results + +def generate_keys(pattern: str, count: int, batch_size: int) -> list: + for i in range(1, count, batch_size): + batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)] + yield batch + +async def mem_cleanup(connection, pattern, num, cond, keys_count): + counter=0 + for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950): + if cond.dont_stop() == False: + print(f"task number {num} that deleted keys {pattern} finished") + return counter + counter += await delete_keys(connection, keys) + await asyncio.sleep(0.2) + print(f"task number {num} that deleted keys {pattern} finished") + return counter + + +async def run_tasks(pool, key_name, value_size, keys_count): + keys = [f"{key_name}-{i}" for i in range(4)] + stop_cond = TaskCancel() + try: + connection = aioredis.Redis(connection_pool=pool) + for key in keys: + print(f"creating key {key} with size {value_size} of count {keys_count}") + await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size) + await asyncio.sleep(2) + tasks = [] + count = 0 + for key in keys: + pattern=f"{key}:" + print(f"deleting keys from {pattern}") + tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count))) + count += 1 + monitor_task = asyncio.create_task(stats_check(connection, stop_cond)) + total = await asyncio.gather(*tasks, return_exceptions=True) + print(f"successfully deleted {sum(total)} keys") + stop_cond.stop() + await monitor_task + print("finish executing") + return True + except Exception as e: + print(f"got error {e} while running delete keys") + return False + + +def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379): + async_pool = aioredis.ConnectionPool(host=host, port=port, + db=0, decode_responses=True, max_connections=16) + + loop = asyncio.new_event_loop() + success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)) + return success + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-k', '--keys', type=int, default=4023467, help='total number of keys') + parser.add_argument('-v', '--value_size', type=int, default=64, help='size of the values') + parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name') + parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name') + parser.add_argument('-p', '--port', type=int, default=6379, help='server port number') + args = parser.parse_args() + keys_num = args.keys + key_name = args.key_name + value_size = args.value_size + host = args.server + port = args.port + print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}") + result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port) + if result == True: + print("finished successfully") + else: + print("failed") diff --git a/tools/requirements.txt b/tools/requirements.txt new file mode 100644 index 000000000..f9ede589b --- /dev/null +++ b/tools/requirements.txt @@ -0,0 +1,7 @@ +aioredis==2.0.1 +async_timeout==4.0.2 +pytoml==0.1.21 +PyYAML==6.0 +railroad==0.5.0 +redis==4.3.4 +requests==2.28.1