1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: support qlist compression when accounting for memory (#4233)

Also fix "debug objhist" so that its value histogram will show effective malloc
used distributions for all types.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-01 21:52:11 +02:00 committed by GitHub
parent 872e49b0b8
commit 4a85c69db1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 89 additions and 34 deletions

View file

@ -186,9 +186,6 @@ inline ssize_t NodeSetEntry(quicklistNode* node, uint8_t* entry) {
bool CompressNode(quicklistNode* node) { bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW); DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress); DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif
/* validate that the node is neither /* validate that the node is neither
* tail nor head (it has prev and next)*/ * tail nor head (it has prev and next)*/
@ -219,12 +216,13 @@ bool CompressNode(quicklistNode* node) {
return true; return true;
} }
bool CompressNodeIfNeeded(quicklistNode* node) { ssize_t CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node); DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) { if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node); if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
} }
return false; return 0;
} }
/* Uncompress the listpack in 'node' and update encoding details. /* Uncompress the listpack in 'node' and update encoding details.
@ -247,17 +245,24 @@ bool DecompressNode(bool recompress, quicklistNode* node) {
/* Decompress only compressed nodes. /* Decompress only compressed nodes.
recompress: if true, the node will be marked for recompression after decompression. recompress: if true, the node will be marked for recompression after decompression.
returns by how much the size of the node has increased.
*/ */
void DecompressNodeIfNeeded(bool recompress, quicklistNode* node) { ssize_t DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) { if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) {
DecompressNode(recompress, node); size_t compressed_sz = ((quicklistLZF*)node->entry)->sz;
if (DecompressNode(recompress, node)) {
return node->sz - compressed_sz;
} }
} }
return 0;
}
void RecompressOnly(quicklistNode* node) { ssize_t RecompressOnly(quicklistNode* node) {
if (node->recompress && !node->dont_compress) { if (node->recompress && !node->dont_compress) {
CompressNode(node); if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
} }
return 0;
} }
quicklistNode* SplitNode(quicklistNode* node, int offset, bool after, ssize_t* diff) { quicklistNode* SplitNode(quicklistNode* node, int offset, bool after, ssize_t* diff) {
@ -564,7 +569,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) { if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) {
InsertPlainNode(node, elem, insert_opt); InsertPlainNode(node, elem, insert_opt);
} else { } else {
DecompressNodeIfNeeded(true, node); malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0; ssize_t diff_existing = 0;
quicklistNode* new_node = SplitNode(node, it.offset_, after, &diff_existing); quicklistNode* new_node = SplitNode(node, it.offset_, after, &diff_existing);
quicklistNode* entry_node = InsertPlainNode(node, elem, insert_opt); quicklistNode* entry_node = InsertPlainNode(node, elem, insert_opt);
@ -576,11 +581,11 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
/* Now determine where and how to insert the new element */ /* Now determine where and how to insert the new element */
if (!full) { if (!full) {
DecompressNodeIfNeeded(true, node); malloc_size_ += DecompressNodeIfNeeded(true, node);
uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE); uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE);
malloc_size_ += NodeSetEntry(node, new_entry); malloc_size_ += NodeSetEntry(node, new_entry);
node->count++; node->count++;
RecompressOnly(node); malloc_size_ += RecompressOnly(node);
} else { } else {
bool insert_tail = at_tail && after; bool insert_tail = at_tail && after;
bool insert_head = at_head && !after; bool insert_head = at_head && !after;
@ -588,20 +593,20 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
/* If we are: at tail, next has free space, and inserting after: /* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */ * - insert entry at head of next node. */
auto* new_node = node->next; auto* new_node = node->next;
DecompressNodeIfNeeded(true, new_node); malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem)); malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem));
new_node->count++; new_node->count++;
RecompressOnly(new_node); malloc_size_ += RecompressOnly(new_node);
RecompressOnly(node); malloc_size_ += RecompressOnly(node);
} else if (insert_head && avail_prev) { } else if (insert_head && avail_prev) {
/* If we are: at head, previous has free space, and inserting before: /* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */ * - insert entry at tail of previous node. */
auto* new_node = node->prev; auto* new_node = node->prev;
DecompressNodeIfNeeded(true, new_node); malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem)); malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem));
new_node->count++; new_node->count++;
RecompressOnly(new_node); malloc_size_ += RecompressOnly(new_node);
RecompressOnly(node); malloc_size_ += RecompressOnly(node);
} else if (insert_tail || insert_head) { } else if (insert_tail || insert_head) {
/* If we are: full, and our prev/next has no available space, then: /* If we are: full, and our prev/next has no available space, then:
* - create new node and attach to qlist */ * - create new node and attach to qlist */
@ -610,7 +615,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
} else { } else {
/* else, node is full we need to split it. */ /* else, node is full we need to split it. */
/* covers both after and !after cases */ /* covers both after and !after cases */
DecompressNodeIfNeeded(true, node); malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0; ssize_t diff_existing = 0;
auto* new_node = SplitNode(node, it.offset_, after, &diff_existing); auto* new_node = SplitNode(node, it.offset_, after, &diff_existing);
auto func = after ? LP_Prepend : LP_Append; auto func = after ? LP_Prepend : LP_Append;
@ -710,8 +715,8 @@ void QList::Compress(quicklistNode* node) {
int depth = 0; int depth = 0;
int in_depth = 0; int in_depth = 0;
while (depth++ < compress_) { while (depth++ < compress_) {
DecompressNodeIfNeeded(false, forward); malloc_size_ += DecompressNodeIfNeeded(false, forward);
DecompressNodeIfNeeded(false, reverse); malloc_size_ += DecompressNodeIfNeeded(false, reverse);
if (forward == node || reverse == node) if (forward == node || reverse == node)
in_depth = 1; in_depth = 1;
@ -726,11 +731,11 @@ void QList::Compress(quicklistNode* node) {
} }
if (!in_depth && node) if (!in_depth && node)
CompressNodeIfNeeded(node); malloc_size_ += CompressNodeIfNeeded(node);
/* At this point, forward and reverse are one node beyond depth */ /* At this point, forward and reverse are one node beyond depth */
CompressNodeIfNeeded(forward); malloc_size_ += CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse); malloc_size_ += CompressNodeIfNeeded(reverse);
} }
/* Attempt to merge listpacks within two nodes on either side of 'center'. /* Attempt to merge listpacks within two nodes on either side of 'center'.
@ -801,8 +806,8 @@ quicklistNode* QList::MergeNodes(quicklistNode* center) {
* Returns the input node picked to merge against or NULL if * Returns the input node picked to merge against or NULL if
* merging was not possible. */ * merging was not possible. */
quicklistNode* QList::ListpackMerge(quicklistNode* a, quicklistNode* b) { quicklistNode* QList::ListpackMerge(quicklistNode* a, quicklistNode* b) {
DecompressNodeIfNeeded(false, a); malloc_size_ += DecompressNodeIfNeeded(false, a);
DecompressNodeIfNeeded(false, b); malloc_size_ += DecompressNodeIfNeeded(false, b);
if ((lpMerge(&a->entry, &b->entry))) { if ((lpMerge(&a->entry, &b->entry))) {
/* We merged listpacks! Now remove the unused quicklistNode. */ /* We merged listpacks! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL; quicklistNode *keep = NULL, *nokeep = NULL;
@ -1047,14 +1052,14 @@ bool QList::Erase(const long start, unsigned count) {
if (delete_entire_node || QL_NODE_IS_PLAIN(node)) { if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node); DelNode(node);
} else { } else {
DecompressNodeIfNeeded(true, node); malloc_size_ += DecompressNodeIfNeeded(true, node);
malloc_size_ += NodeSetEntry(node, lpDeleteRange(node->entry, offset, del)); malloc_size_ += NodeSetEntry(node, lpDeleteRange(node->entry, offset, del));
node->count -= del; node->count -= del;
count_ -= del; count_ -= del;
if (node->count == 0) { if (node->count == 0) {
DelNode(node); DelNode(node);
} else { } else {
RecompressOnly(node); malloc_size_ += RecompressOnly(node);
} }
} }
@ -1081,7 +1086,7 @@ bool QList::Iterator::Next() {
int plain = QL_NODE_IS_PLAIN(current_); int plain = QL_NODE_IS_PLAIN(current_);
if (!zi_) { if (!zi_) {
/* If !zi, use current index. */ /* If !zi, use current index. */
DecompressNodeIfNeeded(true, current_); const_cast<QList*>(owner_)->malloc_size_ += DecompressNodeIfNeeded(true, current_);
if (ABSL_PREDICT_FALSE(plain)) if (ABSL_PREDICT_FALSE(plain))
zi_ = current_->entry; zi_ = current_->entry;
else else

View file

@ -19,7 +19,9 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "core/compact_object.h" #include "core/compact_object.h"
#include "core/qlist.h" #include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h" #include "core/string_map.h"
#include "core/string_set.h"
#include "server/blocking_controller.h" #include "server/blocking_controller.h"
#include "server/container_utils.h" #include "server/container_utils.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
@ -65,6 +67,7 @@ struct ObjInfo {
// for lists - how many nodes do they have. // for lists - how many nodes do they have.
unsigned num_nodes = 0; unsigned num_nodes = 0;
unsigned num_compressed = 0;
enum LockStatus { NONE, S, X } lock_status = NONE; enum LockStatus { NONE, S, X } lock_status = NONE;
@ -212,11 +215,23 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {
if (pv.ObjType() == OBJ_LIST) { if (pv.ObjType() == OBJ_LIST) {
IterateList(pv, per_entry_cb, 0, -1); IterateList(pv, per_entry_cb, 0, -1);
if (pv.Encoding() == kEncodingQL2) {
const QList* ql = static_cast<QList*>(pv.RObjPtr());
val_len = ql->MallocUsed(true);
}
} else if (pv.ObjType() == OBJ_ZSET) { } else if (pv.ObjType() == OBJ_ZSET) {
IterateSortedSet(pv.GetRobjWrapper(), IterateSortedSet(pv.GetRobjWrapper(),
[&](ContainerEntry entry, double) { return per_entry_cb(entry); }); [&](ContainerEntry entry, double) { return per_entry_cb(entry); });
if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) {
detail::SortedMap* smap = static_cast<detail::SortedMap*>(pv.RObjPtr());
val_len = smap->MallocSize();
}
} else if (pv.ObjType() == OBJ_SET) { } else if (pv.ObjType() == OBJ_SET) {
IterateSet(pv, per_entry_cb); IterateSet(pv, per_entry_cb);
if (pv.Encoding() == kEncodingStrMap2) {
StringSet* ss = static_cast<StringSet*>(pv.RObjPtr());
val_len = ss->ObjMallocUsed() + ss->SetMallocUsed();
}
} else if (pv.ObjType() == OBJ_HASH) { } else if (pv.ObjType() == OBJ_HASH) {
if (pv.Encoding() == kEncodingListPack) { if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE]; uint8_t intbuf[LP_INTBUF_SIZE];
@ -322,6 +337,14 @@ ObjInfo InspectOp(ConnectionContext* cntx, string_view key) {
if (pv.ObjType() == OBJ_LIST && pv.Encoding() == kEncodingQL2) { if (pv.ObjType() == OBJ_LIST && pv.Encoding() == kEncodingQL2) {
const QList* qlist = static_cast<const QList*>(pv.RObjPtr()); const QList* qlist = static_cast<const QList*>(pv.RObjPtr());
oinfo.num_nodes = qlist->node_count(); oinfo.num_nodes = qlist->node_count();
auto* node = qlist->Head();
while (node) {
if (node->encoding == QUICKLIST_NODE_ENCODING_LZF) {
++oinfo.num_compressed;
}
node = node->next;
}
} }
if (pv.IsExternal()) { if (pv.IsExternal()) {
@ -357,13 +380,31 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_
} }
// Only strings are supported right now. // Only strings are supported right now.
if (it->second.ObjType() != OBJ_STRING) { if (it->second.ObjType() != OBJ_STRING && it->second.ObjType() != OBJ_LIST) {
return OpStatus::WRONG_TYPE; return OpStatus::WRONG_TYPE;
} }
ValueCompressInfo info;
if (it->second.ObjType() == OBJ_LIST) {
if (it->second.Encoding() != kEncodingQL2) {
return OpStatus::WRONG_TYPE;
}
const QList* src = static_cast<const QList*>(it->second.RObjPtr());
info.raw_size = src->MallocUsed(true);
QList qlist(-2, 1);
auto copy_cb = [&](QList::Entry entry) {
qlist.Push(entry.view(), QList::HEAD);
return true;
};
src->Iterate(copy_cb, 0, -1);
info.compressed_size = qlist.MallocUsed(true);
return info;
}
string scratch; string scratch;
string_view value = it->second.GetSlice(&scratch); string_view value = it->second.GetSlice(&scratch);
ValueCompressInfo info;
info.raw_size = value.size(); info.raw_size = value.size();
info.compressed_size = info.raw_size; info.compressed_size = info.raw_size;
@ -849,7 +890,10 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
ShardId sid = Shard(key, ess.size()); ShardId sid = Shard(key, ess.size());
VLOG(1) << "DebugCmd::Inspect " << key; VLOG(1) << "DebugCmd::Inspect " << key;
bool check_compression = (args.size() == 1) && ArgS(args, 0) == "COMPRESS"; bool check_compression = false;
if (args.size() == 1) {
check_compression = absl::AsciiStrToUpper(ArgS(args, 0)) == "COMPRESS";
}
string resp; string resp;
if (check_compression) { if (check_compression) {
@ -886,7 +930,13 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
} }
if (res.num_nodes) { if (res.num_nodes) {
StrAppend(&resp, " ns:", res.num_nodes); // node count
StrAppend(&resp, " nc:", res.num_nodes);
}
if (res.num_compressed) {
// compressed nodes
StrAppend(&resp, " cn:", res.num_compressed);
} }
if (res.lock_status != ObjInfo::NONE) { if (res.lock_status != ObjInfo::NONE) {