1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(server): memory defrag support - unit tests added to verify #448 (#523)

feat(server): active memory defrag task #448

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
This commit is contained in:
Boaz Sade 2022-12-04 16:00:12 +02:00 committed by GitHub
parent 8bff194f83
commit 6ec4cf078c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 174 additions and 36 deletions

View file

@ -39,6 +39,7 @@ using absl::GetFlag;
namespace {
constexpr XXH64_hash_t kHashSeed = 24061983;
constexpr size_t kAlignSize = 8u;
// Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
@ -217,7 +218,7 @@ struct TL {
thread_local TL tl;
constexpr bool kUseSmallStrings = true;
constexpr bool kUseSmallStrings = false;
/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
/// file and implement with SIMD instructions.
@ -380,6 +381,23 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) {
}
}
bool RobjWrapper::DefragIfNeeded(float ratio) {
if (type() == OBJ_STRING) { // only applicable to strings
if (zmalloc_page_is_underutilized(inner_obj(), ratio)) {
return Reallocate(tl.local_mr);
}
}
return false;
}
bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) {
void* old_ptr = inner_obj_;
inner_obj_ = mr->allocate(sz_, kAlignSize);
memcpy(inner_obj_, old_ptr, sz_);
mr->deallocate(old_ptr, 0, kAlignSize);
return true;
}
void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) {
type_ = type;
encoding_ = encoding;
@ -398,13 +416,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
desired += SDS_MAX_PREALLOC;
}
void* newp = mr->allocate(desired, 8);
void* newp = mr->allocate(desired, kAlignSize);
if (sz_) {
memcpy(newp, inner_obj_, sz_);
}
if (current_cap) {
mr->deallocate(inner_obj_, current_cap, 8);
mr->deallocate(inner_obj_, current_cap, kAlignSize);
}
inner_obj_ = newp;
}
@ -659,7 +677,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();
if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
}
if (res->type == OBJ_HASH) {
@ -850,6 +868,28 @@ string_view CompactObj::GetSlice(string* scratch) const {
return string_view{};
}
bool CompactObj::DefragIfNeeded(float ratio) {
switch (taglen_) {
case ROBJ_TAG:
// currently only these objet types are supported for this operation
if (u_.r_obj.inner_obj() != nullptr) {
return u_.r_obj.DefragIfNeeded(ratio);
}
return false;
case SMALL_TAG:
// TODO - support this later
return false;
case INT_TAG:
// this is not relevant in this case
return false;
case EXTERNAL_TAG:
return false;
default:
// This is the case when the object is at inline_str
return false;
}
}
bool CompactObj::HasAllocated() const {
if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG ||
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))

View file

@ -16,7 +16,7 @@ typedef struct redisObject robj;
namespace dfly {
constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingListPack = 3;
@ -52,7 +52,10 @@ class RobjWrapper {
return std::string_view{reinterpret_cast<char*>(inner_obj_), sz_};
}
bool DefragIfNeeded(float ratio);
private:
bool Reallocate(std::pmr::memory_resource* mr);
size_t InnerObjMallocUsed() const;
void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr);
@ -208,6 +211,8 @@ class CompactObj {
return mask_ & IO_PENDING;
}
bool DefragIfNeeded(float ratio);
void SetIoPending(bool b) {
if (b) {
mask_ |= IO_PENDING;

View file

@ -342,7 +342,7 @@ TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
bool found = HasUnderutilizedMemory(ptrs, kUnderUtilizedRatio);
ASSERT_FALSE(found);
DeallocateAtRandom(kRandomStep, &ptrs);
// TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);
// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page

View file

@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS]
}
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_set(mi_option_decommit_delay, 0);
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

View file

@ -758,21 +758,66 @@ TEST_F(DflyEngineTest, Bug496) {
TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;
max_memory_limit = 300'000; // control memory size so no need for too many keys
constexpr int kNumberOfKeys = 100'000; // this fill the memory
constexpr int kKeySize = 137;
constexpr int kMaxDefragTriesForTests = 10;
RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
std::vector<std::string> keys2delete;
keys2delete.push_back("del");
// Generate a list of keys that would be deleted
// The keys that we will delete are all in the form of "key-name:1<other digits>"
// This is because we are populating keys that has this format, but we don't want
// to delete all keys, only some random keys so we deleting those that start with 1
constexpr int kFactor = 10;
int kMaxNumKeysToDelete = 10'000;
int current_step = kFactor;
for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) {
for (; i < current_step; i++) {
int j = i - 1 + current_step;
keys2delete.push_back("key-name:" + std::to_string(j));
}
}
std::vector<std::string_view> keys(keys2delete.begin(), keys2delete.end());
RespExpr resp = Run(
{"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));
EXPECT_THAT(resp, IntArg(kNumberOfKeys));
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// we are not running stats yet
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
// we are expecting to have at least one try by now
EXPECT_GT(shard->GetDefragStats().tries, 0);
});
ArgSlice delete_cmd(keys);
auto r = CheckedInt(delete_cmd);
// the first element in this is the command del so size is one less
ASSERT_EQ(r, keys2delete.size() - 1);
// At this point we need to see whether we did running the task and whether the task did something
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
// a "busy wait" to ensure that memory defragmentations was successful:
// the task ran and did it work
auto stats = shard->GetDefragStats();
for (int i = 0; i < kMaxDefragTriesForTests; i++) {
stats = shard->GetDefragStats();
if (stats.success_count > 0) {
break;
}
this_fiber::sleep_for(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.success_count, 0);
});
}

View file

@ -51,7 +51,8 @@ using absl::GetFlag;
namespace {
constexpr DbIndex DEFAULT_DB_INDEX = 0;
constexpr DbIndex kDefaultDbIndex = 0;
constexpr uint64_t kCursorDoneState = 0u;
vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init
@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() {
const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);
if (cursor > 0) {
if (cursor > kCursorDoneState) {
return true;
}
@ -100,34 +101,73 @@ bool EngineShard::DefragTaskState::IsRequired() {
// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO - Impl!!
return defrag_state_.cursor > 0;
}
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
// are done using fibers, This fiber is run only when no other fiber in the
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------
void EngineShard::DefragTaskState::Init() {
cursor = 0u;
constexpr size_t kMaxTraverses = 50;
const float threshold = GetFlag(FLAGS_mem_utilization_threshold);
auto& slice = db_slice();
DCHECK(slice.IsDbValid(kDefaultDbIndex));
auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex);
PrimeTable::Cursor cur = defrag_state_.cursor;
uint64_t defrag_count = 0;
unsigned traverses_count = 0;
do {
cur = prime_table->Traverse(cur, [&](PrimeIterator it) {
// for each value check whether we should move it because it
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
if (did) {
defrag_count++;
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);
defrag_state_.cursor = cur.value();
if (defrag_count > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == 0 ? "end" : "in progress");
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
<< (defrag_state_.cursor == 0 ? "end" : "in progress")
<< " but no location for defrag were found";
}
defrag_state_.stats.success_count += defrag_count;
defrag_state_.stats.tries++;
return defrag_state_.cursor > kCursorDoneState;
}
// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to
// underutilized pages values
// if the cursor returned from scan is not in done state, schedule the task to run at high
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
const auto shard_id = db_slice().shard_id();
bool required_state = defrag_state_.IsRequired();
if (required_state) {
if (defrag_state_.IsRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;
return kRunAtLowPriority;
}
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
@ -150,7 +190,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the defragmented task here
defrag_state_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}
@ -170,9 +209,7 @@ void EngineShard::Shutdown() {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}
if (defrag_task_ != 0) {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {

View file

@ -158,8 +158,6 @@ class EngineShard {
uint64_t cursor = 0u;
DefragStats stats;
void Init();
// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();

View file

@ -26,6 +26,15 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to
namespace dfly {
std::ostream& operator<<(std::ostream& os, ArgSlice& list) {
os << "[";
if (!list.empty()) {
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
os << (*(list.end() - 1));
}
return os << "]";
}
extern unsigned kInitSegmentLog;
using MP = MemcacheParser;
@ -267,7 +276,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_list<std::stri
return conn->SplitLines();
}
int64_t BaseFamilyTest::CheckedInt(std::initializer_list<std::string_view> list) {
int64_t BaseFamilyTest::CheckedInt(ArgSlice list) {
RespExpr resp = Run(list);
if (resp.type == RespExpr::INT64) {
return get<int64_t>(resp.u);

View file

@ -56,7 +56,10 @@ class BaseFamilyTest : public ::testing::Test {
MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{});
MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list<std::string_view> list);
int64_t CheckedInt(std::initializer_list<std::string_view> list);
int64_t CheckedInt(std::initializer_list<std::string_view> list) {
return CheckedInt(ArgSlice{list.begin(), list.size()});
}
int64_t CheckedInt(ArgSlice list);
bool IsLocked(DbIndex db_index, std::string_view key) const;
ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;