1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: add defrag logic for zsets (#3836)

* add defrag logic for zsets
* add tests

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-10-23 10:56:56 +03:00 committed by GitHub
parent ea9dc9c454
commit c5a8008348
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 210 additions and 12 deletions

View file

@ -171,6 +171,11 @@ pair<void*, bool> DefragIntSet(intset* is, float ratio) {
return {replacement, true};
}
pair<void*, bool> DefragSortedMap(detail::SortedMap* sm, float ratio) {
const bool reallocated = sm->DefragIfNeeded(ratio);
return {sm, reallocated};
}
// Iterates over allocations of internal hash data structures and re-allocates
// them if their pages are underutilized.
// Returns pointer to new object ptr and whether any re-allocations happened.
@ -208,6 +213,23 @@ pair<void*, bool> DefragSet(unsigned encoding, void* ptr, float ratio) {
}
}
pair<void*, bool> DefragZSet(unsigned encoding, void* ptr, float ratio) {
switch (encoding) {
// Listpack is stored as a single contiguous array
case OBJ_ENCODING_LISTPACK: {
return DefragListPack((uint8_t*)ptr, ratio);
}
// SKIPLIST really means ScoreMap
case OBJ_ENCODING_SKIPLIST: {
return DefragSortedMap((detail::SortedMap*)ptr, ratio);
}
default:
ABSL_UNREACHABLE();
}
}
inline void FreeObjStream(void* ptr) {
freeStream((stream*)ptr);
}
@ -420,19 +442,23 @@ void RobjWrapper::SetString(string_view s, MemoryResource* mr) {
}
bool RobjWrapper::DefragIfNeeded(float ratio) {
auto do_defrag = [this, ratio](auto defrag_fun) mutable {
auto [new_ptr, realloced] = defrag_fun(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
};
if (type() == OBJ_STRING) {
if (zmalloc_page_is_underutilized(inner_obj(), ratio)) {
ReallocateString(tl.local_mr);
return true;
}
} else if (type() == OBJ_HASH) {
auto [new_ptr, realloced] = DefragHash(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
return do_defrag(DefragHash);
} else if (type() == OBJ_SET) {
auto [new_ptr, realloced] = DefragSet(encoding_, inner_obj_, ratio);
inner_obj_ = new_ptr;
return realloced;
return do_defrag(DefragSet);
} else if (type() == OBJ_ZSET) {
return do_defrag(DefragZSet);
}
return false;
}
@ -440,11 +466,11 @@ bool RobjWrapper::DefragIfNeeded(float ratio) {
int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, double* newscore) {
// copied from zsetAdd for listpack only.
/* Turn options into simple to check vars. */
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
bool incr = (in_flags & ZADD_IN_INCR) != 0;
bool nx = (in_flags & ZADD_IN_NX) != 0;
bool xx = (in_flags & ZADD_IN_XX) != 0;
bool gt = (in_flags & ZADD_IN_GT) != 0;
bool lt = (in_flags & ZADD_IN_LT) != 0;
*out_flags = 0; /* We'll return our response flags. */
double curscore;

View file

@ -146,4 +146,43 @@ detail::SdsScorePair ScoreMap::iterator::BreakToPair(void* obj) {
return detail::SdsScorePair(f, GetValue(f));
}
namespace {
// Does not Release obj. Callers must do so explicitly if a `Reallocation` happened
pair<sds, bool> ReallocIfNeededGeneric(void* obj, float ratio) {
sds key = (sds)obj;
size_t key_len = sdslen(key);
if (!zmalloc_page_is_underutilized(key, ratio))
return {key, false};
sds newkey = AllocSdsWithSpace(key_len, 8);
memcpy(newkey, key, key_len + 8 + 1);
return {newkey, true};
}
} // namespace
bool ScoreMap::iterator::ReallocIfNeeded(float ratio, std::function<void(sds, sds)> cb) {
// Unwrap all links to correctly call SetObject()
auto* ptr = curr_entry_;
if (ptr->IsLink()) {
ptr = ptr->AsLink();
}
// Note: we do not iterate over the links. Although we could that...
auto* obj = ptr->GetObject();
auto [new_obj, reallocated] = ReallocIfNeededGeneric(obj, ratio);
if (reallocated) {
if (cb) {
cb((sds)obj, (sds)new_obj);
}
sdsfree((sds)obj);
ptr->SetObject(new_obj);
}
return reallocated;
}
} // namespace dfly

View file

@ -63,6 +63,14 @@ class ScoreMap : public DenseSet {
return BreakToPair(ptr);
}
// Try reducing memory fragmentation of the value by re-allocating. Returns true if
// re-allocation happened.
// If function is set, we call it with the old and the new sds. This is used for data
// structures that hold multiple storages that need to be update simultaneously. For example,
// SortedMap contains both a B+ tree and a ScoreMap with the former, containing pointers
// to the later. Therefore, we need to update those. This is handled by the cb below.
bool ReallocIfNeeded(float ratio, std::function<void(sds, sds)> = {});
iterator& operator++() {
Advance();
return *this;

View file

@ -4,6 +4,8 @@
#include "core/score_map.h"
#include <mimalloc.h>
#include "base/gtest.h"
#include "base/logging.h"
#include "core/mi_memory_resource.h"
@ -84,4 +86,54 @@ TEST_F(ScoreMapTest, EmptyFind) {
EXPECT_EQ(nullopt, sm_->Find("bar"));
}
uint64_t total_wasted_memory = 0;
TEST_F(ScoreMapTest, ReallocIfNeeded) {
auto build_str = [](size_t i) { return to_string(i) + string(131, 'a'); };
auto count_waste = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
size_t used = block_size * area->used;
total_wasted_memory += area->committed - used;
return true;
};
for (size_t i = 0; i < 10'000; i++) {
sm_->AddOrUpdate(build_str(i), i);
}
for (size_t i = 0; i < 10'000; i++) {
if (i % 10 == 0)
continue;
sm_->Erase(build_str(i));
}
mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_before = total_wasted_memory;
size_t underutilized = 0;
for (auto it = sm_->begin(); it != sm_->end(); ++it) {
underutilized += zmalloc_page_is_underutilized(it->first, 0.9);
it.ReallocIfNeeded(0.9);
}
// Check there are underutilized pages
CHECK_GT(underutilized, 0u);
total_wasted_memory = 0;
mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_after = total_wasted_memory;
// Check we waste significanlty less now
EXPECT_GT(wasted_before, wasted_after * 2);
ASSERT_EQ(sm_->UpperBoundSize(), 1000);
for (size_t i = 0; i < 1000; i++) {
auto res = sm_->Find(build_str(i * 10));
ASSERT_EQ(res.has_value(), true);
ASSERT_EQ((size_t)*res, i * 10);
}
}
} // namespace dfly

View file

@ -769,5 +769,19 @@ SortedMap* SortedMap::FromListPack(PMR_NS::memory_resource* res, const uint8_t*
return zs;
}
bool SortedMap::DefragIfNeeded(float ratio) {
auto cb = [this](sds old_obj, sds new_obj) {
score_tree->Delete(old_obj);
score_tree->Insert(new_obj);
};
bool reallocated = false;
for (auto it = score_map->begin(); it != score_map->end(); ++it) {
reallocated |= it.ReallocIfNeeded(ratio, cb);
}
return reallocated;
}
} // namespace detail
} // namespace dfly

View file

@ -88,6 +88,8 @@ class SortedMap {
uint8_t* ToListPack() const;
static SortedMap* FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp);
bool DefragIfNeeded(float ratio);
private:
using ScoreTree = BPTree<ScoreSds, ScoreSdsPolicy>;

View file

@ -265,4 +265,59 @@ TEST_F(SortedMapTest, MemoryUsage) {
zslFree(zsl);
}
uint64_t total_wasted_memory = 0;
TEST_F(SortedMapTest, ReallocIfNeeded) {
auto build_str = [](size_t i) { return to_string(i) + string(131, 'a'); };
auto count_waste = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
size_t used = block_size * area->used;
total_wasted_memory += area->committed - used;
return true;
};
for (size_t i = 0; i < 10'000; i++) {
int out_flags;
double new_val;
auto str = build_str(i);
sds ele = sdsnew(str.c_str());
sm_.Add(i, ele, 0, &out_flags, &new_val);
sdsfree(ele);
}
for (size_t i = 0; i < 10'000; i++) {
if (i % 10 == 0)
continue;
auto str = build_str(i);
sds ele = sdsnew(str.c_str());
sm_.Delete(ele);
sdsfree(ele);
}
mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_before = total_wasted_memory;
ASSERT_TRUE(sm_.DefragIfNeeded(9));
total_wasted_memory = 0;
mi_heap_collect(mi_heap_get_backing(), true);
mi_heap_visit_blocks(mi_heap_get_backing(), false, count_waste, nullptr);
size_t wasted_after = total_wasted_memory;
// Check we waste significanlty less now
EXPECT_GT(wasted_before, wasted_after * 2);
ASSERT_EQ(sm_.Size(), 1000);
auto cb = [i = 0, build_str](sds ele, double score) mutable -> bool {
EXPECT_EQ(std::string_view(ele), build_str(i * 10));
EXPECT_EQ((size_t)score, i * 10);
++i;
return true;
};
sm_.Iterate(0, 10000, false, cb);
}
} // namespace dfly

View file

@ -300,9 +300,11 @@ detail::SdsPair StringMap::iterator::BreakToPair(void* obj) {
bool StringMap::iterator::ReallocIfNeeded(float ratio) {
// Unwrap all links to correctly call SetObject()
auto* ptr = curr_entry_;
while (ptr->IsLink())
if (ptr->IsLink()) {
ptr = ptr->AsLink();
}
// Note: we do not iterate over the links. Although we could that...
auto* obj = ptr->GetObject();
auto [new_obj, realloced] = static_cast<StringMap*>(owner_)->ReallocIfNeeded(obj, ratio);
ptr->SetObject(new_obj);