mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat: add defragment command (#3003)
* feat: add defragment command and improve auto defragmentation algorithm
This commit is contained in:
parent
d675e63ab0
commit
3dd6c4959c
5 changed files with 93 additions and 37 deletions
|
@ -23,6 +23,7 @@ extern "C" {
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
|
|
||||||
ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
|
ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
|
||||||
|
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
|
||||||
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
|
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
|
||||||
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
|
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
|
||||||
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);
|
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);
|
||||||
|
@ -641,7 +642,9 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
|
||||||
GTEST_SKIP() << "Defragmentation via idle task is only supported in io uring";
|
GTEST_SKIP() << "Defragmentation via idle task is only supported in io uring";
|
||||||
}
|
}
|
||||||
|
|
||||||
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.02);
|
// mem_defrag_threshold is based on RSS statistic, but we don't count it in the test
|
||||||
|
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.0);
|
||||||
|
absl::SetFlag(&FLAGS_mem_defrag_check_sec_interval, 0);
|
||||||
// Fill data into dragonfly and then check if we have
|
// Fill data into dragonfly and then check if we have
|
||||||
// any location in memory to defrag. See issue #448 for details about this.
|
// any location in memory to defrag. See issue #448 for details about this.
|
||||||
constexpr size_t kMaxMemoryForTest = 1'100'000;
|
constexpr size_t kMaxMemoryForTest = 1'100'000;
|
||||||
|
|
|
@ -69,6 +69,9 @@ ABSL_FLAG(string, shard_round_robin_prefix, "",
|
||||||
"support up to a few hundreds of prefixes. Note: prefix is looked inside hash tags when "
|
"support up to a few hundreds of prefixes. Note: prefix is looked inside hash tags when "
|
||||||
"cluster mode is enabled.");
|
"cluster mode is enabled.");
|
||||||
|
|
||||||
|
ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10,
|
||||||
|
"Number of seconds between every defragmentation necessity check");
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
using namespace tiering::literals;
|
using namespace tiering::literals;
|
||||||
|
@ -222,7 +225,6 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
|
||||||
|
|
||||||
void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {
|
void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {
|
||||||
cursor = cursor_val;
|
cursor = cursor_val;
|
||||||
underutilized_found = false;
|
|
||||||
// Once we're done with a db, jump to the next
|
// Once we're done with a db, jump to the next
|
||||||
if (cursor == kCursorDoneState) {
|
if (cursor == kCursorDoneState) {
|
||||||
dbid++;
|
dbid++;
|
||||||
|
@ -231,7 +233,6 @@ void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {
|
||||||
|
|
||||||
void EngineShard::DefragTaskState::ResetScanState() {
|
void EngineShard::DefragTaskState::ResetScanState() {
|
||||||
dbid = cursor = 0u;
|
dbid = cursor = 0u;
|
||||||
underutilized_found = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function checks 3 things:
|
// This function checks 3 things:
|
||||||
|
@ -241,8 +242,9 @@ void EngineShard::DefragTaskState::ResetScanState() {
|
||||||
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
|
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
|
||||||
// (control by mem_defrag_waste_threshold flag)
|
// (control by mem_defrag_waste_threshold flag)
|
||||||
bool EngineShard::DefragTaskState::CheckRequired() {
|
bool EngineShard::DefragTaskState::CheckRequired() {
|
||||||
if (cursor > kCursorDoneState || underutilized_found) {
|
if (is_force_defrag || cursor > kCursorDoneState) {
|
||||||
VLOG(2) << "cursor: " << cursor << " and underutilized_found " << underutilized_found;
|
is_force_defrag = false;
|
||||||
|
VLOG(2) << "cursor: " << cursor << " and is_force_defrag " << is_force_defrag;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,20 +253,35 @@ bool EngineShard::DefragTaskState::CheckRequired() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::size_t threshold_mem = memory_per_shard * GetFlag(FLAGS_mem_defrag_threshold);
|
const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
|
||||||
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
|
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto now = time(nullptr);
|
||||||
|
const auto seconds_from_prev_check = now - last_check_time;
|
||||||
|
const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval);
|
||||||
|
|
||||||
|
if (seconds_from_prev_check < mem_defrag_interval) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
last_check_time = now;
|
||||||
|
|
||||||
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
|
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
|
||||||
|
|
||||||
if (threshold_mem < usage.commited &&
|
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
|
||||||
usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
|
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
|
||||||
VLOG(1) << "memory issue found for memory " << usage;
|
VLOG(1) << "memory issue found for memory " << usage;
|
||||||
underutilized_found = true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void EngineShard::ForceDefrag() {
|
||||||
|
defrag_state_.is_force_defrag = true;
|
||||||
|
}
|
||||||
|
|
||||||
bool EngineShard::DoDefrag() {
|
bool EngineShard::DoDefrag() {
|
||||||
// --------------------------------------------------------------------------
|
// --------------------------------------------------------------------------
|
||||||
// NOTE: This task is running with exclusive access to the shard.
|
// NOTE: This task is running with exclusive access to the shard.
|
||||||
|
@ -341,8 +358,7 @@ uint32_t EngineShard::DefragTask() {
|
||||||
const auto shard_id = db_slice().shard_id();
|
const auto shard_id = db_slice().shard_id();
|
||||||
|
|
||||||
if (defrag_state_.CheckRequired()) {
|
if (defrag_state_.CheckRequired()) {
|
||||||
VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor
|
VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
|
||||||
<< ", underutilzation found: " << defrag_state_.underutilized_found;
|
|
||||||
if (DoDefrag()) {
|
if (DoDefrag()) {
|
||||||
// we didn't finish the scan
|
// we didn't finish the scan
|
||||||
return util::ProactorBase::kOnIdleMaxLevel;
|
return util::ProactorBase::kOnIdleMaxLevel;
|
||||||
|
|
|
@ -183,11 +183,14 @@ class EngineShard {
|
||||||
|
|
||||||
TxQueueInfo AnalyzeTxQueue() const;
|
TxQueueInfo AnalyzeTxQueue() const;
|
||||||
|
|
||||||
|
void ForceDefrag();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct DefragTaskState {
|
struct DefragTaskState {
|
||||||
size_t dbid = 0u;
|
size_t dbid = 0u;
|
||||||
uint64_t cursor = 0u;
|
uint64_t cursor = 0u;
|
||||||
bool underutilized_found = false;
|
time_t last_check_time = 0;
|
||||||
|
bool is_force_defrag = false;
|
||||||
|
|
||||||
// check the current threshold and return true if
|
// check the current threshold and return true if
|
||||||
// we need to do the defragmentation
|
// we need to do the defragmentation
|
||||||
|
|
|
@ -154,6 +154,14 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
return Track(args);
|
return Track(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sub_cmd == "DEFRAGMENT") {
|
||||||
|
shard_set->pool()->DispatchOnAll([this](util::ProactorBase*) {
|
||||||
|
if (auto* shard = EngineShard::tlocal(); shard)
|
||||||
|
shard->ForceDefrag();
|
||||||
|
});
|
||||||
|
return cntx_->SendSimpleString("OK");
|
||||||
|
}
|
||||||
|
|
||||||
string err = UnknownSubCmd(sub_cmd, "MEMORY");
|
string err = UnknownSubCmd(sub_cmd, "MEMORY");
|
||||||
return cntx_->SendError(err, kSyntaxErrType);
|
return cntx_->SendError(err, kSyntaxErrType);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,12 +4,13 @@ import aioredis
|
||||||
import async_timeout
|
import async_timeout
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
'''
|
|
||||||
|
"""
|
||||||
To install: pip install -r requirements.txt
|
To install: pip install -r requirements.txt
|
||||||
|
|
||||||
Run
|
Run
|
||||||
dragonfly --mem_defrag_threshold=0.01 --commit_use_threshold=1.2 --mem_utilization_threshold=0.8
|
dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01
|
||||||
defrag_mem_test.py -k 800000 -v 645
|
defrag_mem_test.py -k 8000000 -v 645
|
||||||
|
|
||||||
This program would try to re-create the issue with memory defragmentation.
|
This program would try to re-create the issue with memory defragmentation.
|
||||||
See issue number 448 for more details.
|
See issue number 448 for more details.
|
||||||
|
@ -29,7 +30,8 @@ To run this:
|
||||||
NOTE:
|
NOTE:
|
||||||
If this seems to get stuck please kill it with ctrl+c
|
If this seems to get stuck please kill it with ctrl+c
|
||||||
This can happen in case we don't have "defrag_realloc_total > 0"
|
This can happen in case we don't have "defrag_realloc_total > 0"
|
||||||
'''
|
"""
|
||||||
|
|
||||||
|
|
||||||
class TaskCancel:
|
class TaskCancel:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -41,14 +43,16 @@ class TaskCancel:
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.run = False
|
self.run = False
|
||||||
|
|
||||||
|
|
||||||
async def run_cmd(connection, cmd, sub_val):
|
async def run_cmd(connection, cmd, sub_val):
|
||||||
val = await connection.execute_command(cmd, sub_val)
|
val = await connection.execute_command(cmd, sub_val)
|
||||||
return val
|
return val
|
||||||
|
|
||||||
|
|
||||||
async def handle_defrag_stats(connection, prev):
|
async def handle_defrag_stats(connection, prev):
|
||||||
info = await run_cmd(connection, "info", "stats")
|
info = await run_cmd(connection, "info", "stats")
|
||||||
if info is not None:
|
if info is not None:
|
||||||
if info['defrag_task_invocation_total'] != prev:
|
if info["defrag_task_invocation_total"] != prev:
|
||||||
print("--------------------------------------------------------------")
|
print("--------------------------------------------------------------")
|
||||||
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
|
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
|
||||||
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
|
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
|
||||||
|
@ -56,22 +60,23 @@ async def handle_defrag_stats(connection, prev):
|
||||||
print("--------------------------------------------------------------")
|
print("--------------------------------------------------------------")
|
||||||
if info["defrag_realloc_total"] > 0:
|
if info["defrag_realloc_total"] > 0:
|
||||||
return True, None
|
return True, None
|
||||||
return False, info['defrag_task_invocation_total']
|
return False, info["defrag_task_invocation_total"]
|
||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
async def memory_stats(connection):
|
async def memory_stats(connection):
|
||||||
print("--------------------------------------------------------------")
|
print("--------------------------------------------------------------")
|
||||||
info = await run_cmd(connection, "info", "memory")
|
info = await run_cmd(connection, "info", "memory")
|
||||||
print(f"memory commited: {info['comitted_memory']:,}")
|
# print(f"memory commited: {info['comitted_memory']:,}")
|
||||||
print(f"memory used: {info['used_memory']:,}")
|
print(f"memory used: {info['used_memory']:,}")
|
||||||
print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
|
# print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
|
||||||
print("--------------------------------------------------------------")
|
print("--------------------------------------------------------------")
|
||||||
|
|
||||||
|
|
||||||
async def stats_check(connection, condition):
|
async def stats_check(connection, condition):
|
||||||
try:
|
try:
|
||||||
defrag_task_invocation_total = 0;
|
defrag_task_invocation_total = 0
|
||||||
runs=0
|
runs = 0
|
||||||
while condition.dont_stop():
|
while condition.dont_stop():
|
||||||
await asyncio.sleep(0.3)
|
await asyncio.sleep(0.3)
|
||||||
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
|
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
|
||||||
|
@ -101,13 +106,15 @@ async def delete_keys(connection, keys):
|
||||||
results = await connection.delete(*keys)
|
results = await connection.delete(*keys)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
|
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
|
||||||
for i in range(1, count, batch_size):
|
for i in range(1, count, batch_size):
|
||||||
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
|
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
|
||||||
yield batch
|
yield batch
|
||||||
|
|
||||||
|
|
||||||
async def mem_cleanup(connection, pattern, num, cond, keys_count):
|
async def mem_cleanup(connection, pattern, num, cond, keys_count):
|
||||||
counter=0
|
counter = 0
|
||||||
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
|
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
|
||||||
if cond.dont_stop() == False:
|
if cond.dont_stop() == False:
|
||||||
print(f"task number {num} that deleted keys {pattern} finished")
|
print(f"task number {num} that deleted keys {pattern} finished")
|
||||||
|
@ -130,9 +137,17 @@ async def run_tasks(pool, key_name, value_size, keys_count):
|
||||||
tasks = []
|
tasks = []
|
||||||
count = 0
|
count = 0
|
||||||
for key in keys:
|
for key in keys:
|
||||||
pattern=f"{key}:"
|
pattern = f"{key}:"
|
||||||
print(f"deleting keys from {pattern}")
|
print(f"deleting keys from {pattern}")
|
||||||
tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count)))
|
tasks.append(
|
||||||
|
mem_cleanup(
|
||||||
|
connection=connection,
|
||||||
|
pattern=pattern,
|
||||||
|
num=count,
|
||||||
|
cond=stop_cond,
|
||||||
|
keys_count=int(keys_count),
|
||||||
|
)
|
||||||
|
)
|
||||||
count += 1
|
count += 1
|
||||||
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
|
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
|
||||||
total = await asyncio.gather(*tasks, return_exceptions=True)
|
total = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
@ -147,29 +162,40 @@ async def run_tasks(pool, key_name, value_size, keys_count):
|
||||||
|
|
||||||
|
|
||||||
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
|
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
|
||||||
async_pool = aioredis.ConnectionPool(host=host, port=port,
|
async_pool = aioredis.ConnectionPool(
|
||||||
db=0, decode_responses=True, max_connections=16)
|
host=host, port=port, db=0, decode_responses=True, max_connections=16
|
||||||
|
)
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count))
|
success = loop.run_until_complete(
|
||||||
|
run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)
|
||||||
|
)
|
||||||
return success
|
return success
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
parser = argparse.ArgumentParser(
|
||||||
parser.add_argument('-k', '--keys', type=int, default=800000, help='total number of keys')
|
description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||||
parser.add_argument('-v', '--value_size', type=int, default=645, help='size of the values')
|
)
|
||||||
parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name')
|
parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys")
|
||||||
parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name')
|
parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values")
|
||||||
parser.add_argument('-p', '--port', type=int, default=6379, help='server port number')
|
parser.add_argument(
|
||||||
|
"-n", "--key_name", type=str, default="key-for-testing", help="the base key name"
|
||||||
|
)
|
||||||
|
parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name")
|
||||||
|
parser.add_argument("-p", "--port", type=int, default=6379, help="server port number")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
keys_num = args.keys
|
keys_num = args.keys
|
||||||
key_name = args.key_name
|
key_name = args.key_name
|
||||||
value_size = args.value_size
|
value_size = args.value_size
|
||||||
host = args.server
|
host = args.server
|
||||||
port = args.port
|
port = args.port
|
||||||
print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}")
|
print(
|
||||||
result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port)
|
f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}"
|
||||||
|
)
|
||||||
|
result = connect_and_run(
|
||||||
|
key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port
|
||||||
|
)
|
||||||
if result == True:
|
if result == True:
|
||||||
print("finished successfully")
|
print("finished successfully")
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue