1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: support rdb loading and container utils with QList. (#4109)

generic_family_test and rdb_test pass with qlist enabled.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-11 12:13:10 +02:00 committed by GitHub
parent 745db2c82f
commit 79aa5d490d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 88 additions and 54 deletions

View file

@ -82,7 +82,8 @@ size_t NodeNegFillLimit(int fill) {
}
const uint8_t* uint_ptr(string_view sv) {
return reinterpret_cast<const uint8_t*>(sv.data());
static uint8_t empty = 0;
return sv.empty() ? &empty : reinterpret_cast<const uint8_t*>(sv.data());
}
bool IsLargeElement(size_t sz, int fill) {
@ -136,10 +137,6 @@ quicklistNode* CreateNode() {
return node;
}
uint8_t* LP_FromElem(string_view elem) {
return lpPrepend(lpNew(0), uint_ptr(elem), elem.size());
}
uint8_t* LP_Insert(uint8_t* lp, string_view elem, uint8_t* pos, int lp_where) {
return lpInsertString(lp, uint_ptr(elem), elem.size(), pos, lp_where, NULL);
}
@ -155,15 +152,16 @@ uint8_t* LP_Prepend(uint8_t* lp, string_view elem) {
quicklistNode* CreateNode(int container, string_view value) {
quicklistNode* new_node = CreateNode();
new_node->container = container;
new_node->sz = value.size();
new_node->count++;
new_node->count = 1;
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
DCHECK(!value.empty());
new_node->entry = (uint8_t*)zmalloc(new_node->sz);
memcpy(new_node->entry, value.data(), new_node->sz);
new_node->sz = value.size();
} else {
new_node->entry = LP_FromElem(value);
new_node->entry = LP_Prepend(lpNew(0), value);
new_node->sz = lpBytes(new_node->entry);
}
return new_node;
@ -467,13 +465,10 @@ bool QList::PushHead(string_view value) {
count_++;
if (ABSL_PREDICT_TRUE(NodeAllowInsert(head_, fill_, sz))) {
head_->entry = lpPrepend(head_->entry, uint_ptr(value), sz);
head_->entry = LP_Prepend(head_->entry, value);
NodeUpdateSz(head_);
} else {
quicklistNode* node = CreateNode();
node->entry = LP_FromElem(value);
NodeUpdateSz(node);
quicklistNode* node = CreateNode(QUICKLIST_NODE_CONTAINER_PACKED, value);
InsertNode(head_, node, BEFORE);
}
@ -492,16 +487,16 @@ bool QList::PushTail(string_view value) {
count_++;
if (ABSL_PREDICT_TRUE(NodeAllowInsert(orig, fill_, sz))) {
orig->entry = lpAppend(orig->entry, uint_ptr(value), sz);
orig->entry = LP_Append(orig->entry, value);
NodeUpdateSz(orig);
} else {
quicklistNode* node = CreateNode();
node->entry = LP_FromElem(value);
NodeUpdateSz(node);
InsertNode(orig, node, AFTER);
orig->count++;
return false;
}
tail_->count++;
return (orig != tail_);
quicklistNode* node = CreateNode(QUICKLIST_NODE_CONTAINER_PACKED, value);
InsertNode(orig, node, AFTER);
return true;
}
void QList::InsertPlainNode(quicklistNode* old_node, string_view value, InsertOpt insert_opt) {
@ -559,10 +554,9 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
InsertPlainNode(tail_, elem, insert_opt);
return;
}
new_node = CreateNode();
new_node->entry = LP_FromElem(elem);
new_node = CreateNode(QUICKLIST_NODE_CONTAINER_PACKED, elem);
InsertNode(NULL, new_node, insert_opt);
new_node->count++;
count_++;
return;
}
@ -636,10 +630,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
} else if (full && ((at_tail && !avail_next && after) || (at_head && !avail_prev && !after))) {
/* If we are: full, and our prev/next has no available space, then:
* - create new node and attach to qlist */
new_node = CreateNode();
new_node->entry = LP_FromElem(elem);
new_node->count++;
NodeUpdateSz(new_node);
new_node = CreateNode(QUICKLIST_NODE_CONTAINER_PACKED, elem);
InsertNode(node, new_node, insert_opt);
} else if (full) {
/* else, node is full we need to split it. */

View file

@ -5,6 +5,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
@ -152,24 +153,41 @@ quicklistEntry QLEntry() {
}
bool IterateList(const PrimeValue& pv, const IterateFunc& func, long start, long end) {
quicklist* ql = static_cast<quicklist*>(pv.RObjPtr());
long llen = quicklistCount(ql);
if (end < 0 || end >= llen)
end = llen - 1;
quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start);
quicklistEntry entry = QLEntry();
long lrange = end - start + 1;
bool success = true;
while (success && quicklistNext(qiter, &entry) && lrange-- > 0) {
if (entry.value) {
success = func(ContainerEntry{reinterpret_cast<char*>(entry.value), entry.sz});
} else {
success = func(ContainerEntry{entry.longval});
if (pv.Encoding() == OBJ_ENCODING_QUICKLIST) {
quicklist* ql = static_cast<quicklist*>(pv.RObjPtr());
long llen = quicklistCount(ql);
if (end < 0 || end >= llen)
end = llen - 1;
quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start);
quicklistEntry entry = QLEntry();
long lrange = end - start + 1;
while (success && quicklistNext(qiter, &entry) && lrange-- > 0) {
if (entry.value) {
success = func(ContainerEntry{reinterpret_cast<char*>(entry.value), entry.sz});
} else {
success = func(ContainerEntry{entry.longval});
}
}
quicklistReleaseIterator(qiter);
return success;
}
quicklistReleaseIterator(qiter);
DCHECK_EQ(pv.Encoding(), kEncodingQL2);
QList* ql = static_cast<QList*>(pv.RObjPtr());
ql->Iterate(
[&](const QList::Entry& entry) {
if (entry.is_int()) {
success = func(ContainerEntry{entry.ival()});
} else {
success = func(ContainerEntry{entry.view().data(), entry.view().size()});
}
return success;
},
start, end);
return success;
}

View file

@ -32,6 +32,7 @@ extern "C" {
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
@ -57,7 +58,7 @@ extern "C" {
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
ABSL_DECLARE_FLAG(uint32_t, dbnum);
ABSL_DECLARE_FLAG(bool, list_experimental_v2);
namespace dfly {
using namespace std;
@ -709,20 +710,34 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
}
void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
quicklist* ql;
quicklist* ql = nullptr;
QList* qlv2 = nullptr;
if (config_.append) {
if (!EnsureObjEncoding(OBJ_LIST, OBJ_ENCODING_QUICKLIST)) {
if (pv_->ObjType() != OBJ_LIST) {
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
ql = static_cast<quicklist*>(pv_->RObjPtr());
if (pv_->Encoding() == OBJ_ENCODING_QUICKLIST) {
ql = static_cast<quicklist*>(pv_->RObjPtr());
} else {
DCHECK_EQ(pv_->Encoding(), kEncodingQL2);
qlv2 = static_cast<QList*>(pv_->RObjPtr());
}
} else {
ql = quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
qlv2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
} else {
ql = quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
}
}
auto cleanup = absl::Cleanup([&] {
if (!config_.append) {
quicklistRelease(ql);
if (ql)
quicklistRelease(ql);
else
CompactObj::DeleteMR<QList>(qlv2);
}
});
@ -737,7 +752,11 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
lp = (uint8_t*)zmalloc(sv.size());
::memcpy(lp, (uint8_t*)sv.data(), sv.size());
quicklistAppendPlainNode(ql, lp, sv.size());
if (ql)
quicklistAppendPlainNode(ql, lp, sv.size());
else
qlv2->AppendPlain(lp, sv.size());
return true;
}
@ -774,13 +793,16 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
lp = lpShrinkToFit(lp);
}
quicklistAppendListpack(ql, lp);
if (ql)
quicklistAppendListpack(ql, lp);
else
qlv2->AppendListpack(lp);
return true;
});
if (ec_)
return;
if (quicklistCount(ql) == 0) {
if ((ql && quicklistCount(ql) == 0) || (qlv2 && qlv2->Size() == 0)) {
ec_ = RdbError(errc::empty_key);
return;
}
@ -788,7 +810,10 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
std::move(cleanup).Cancel();
if (!config_.append) {
pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
if (ql)
pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
else
pv_->InitRobj(OBJ_LIST, kEncodingQL2, qlv2);
}
}