1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: implement QList::Erase functionality (#4092)

* chore: implement QList::Erase functionality

    Also add a parametrized test covering fill/compress options.
    Fix a compression bug introduced when copying the code.
    Introduce move c'tor/operator.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: implement QList::Replace functionality

Also add a parametrized test covering fill/compress options.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-09 20:06:32 +02:00 committed by GitHub
parent d3ef0a3630
commit 1819e51f78
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 330 additions and 71 deletions

View file

@ -24,7 +24,6 @@ using namespace std;
* size, the maximum limit is bigger, but still safe.
* 8k is a recommended / default size limit */
#define SIZE_SAFETY_LIMIT 8192
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)
/* Maximum estimate of the listpack entry overhead.
* Although in the worst case(sz < 64), we will waste 6 bytes in one
@ -54,7 +53,7 @@ using namespace std;
if ((_node)->recompress) \
CompressNode((_node)); \
else \
Compress(_node); \
this->Compress(_node); \
} while (0)
namespace dfly {
@ -90,7 +89,7 @@ bool IsLargeElement(size_t sz, int fill) {
if (ABSL_PREDICT_FALSE(packed_threshold != 0))
return sz >= packed_threshold;
if (fill >= 0)
return !sizeMeetsSafetyLimit(sz);
return sz > SIZE_SAFETY_LIMIT;
else
return sz > NodeNegFillLimit(fill);
}
@ -178,15 +177,15 @@ void NodeUpdateSz(quicklistNode* node) {
* Returns true if listpack compressed successfully.
* Returns false if compression failed or if listpack too small to compress. */
bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif
if (node->dont_compress)
return false;
/* validate that the node is neither
* tail nor head (it has prev and next)*/
assert(node->prev && node->next);
DCHECK(node->prev && node->next);
node->recompress = 0;
/* Don't bother compressing small values */
@ -213,6 +212,14 @@ bool CompressNode(quicklistNode* node) {
return true;
}
bool CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node);
}
return false;
}
/* Uncompress the listpack in 'node' and update encoding details.
* Returns 1 on successful decode, 0 on failure to decode. */
bool DecompressNode(bool recompress, quicklistNode* node) {
@ -246,15 +253,6 @@ void RecompressOnly(quicklistNode* node) {
}
}
bool ElemCompare(const QList::Entry& entry, string_view elem) {
if (entry.value) {
return entry.view() == elem;
}
absl::AlphaNum an(entry.longval);
return elem == an.Piece();
}
quicklistNode* SplitNode(quicklistNode* node, int offset, bool after) {
size_t zl_sz = node->sz;
@ -293,23 +291,53 @@ QList::QList() : fill_(-2), compress_(0), bookmark_count_(0) {
QList::QList(int fill, int compress) : fill_(fill), compress_(compress), bookmark_count_(0) {
}
QList::~QList() {
unsigned long len;
quicklistNode *current, *next;
QList::QList(QList&& other)
: head_(other.head_),
tail_(other.tail_),
count_(other.count_),
len_(other.len_),
fill_(other.fill_),
compress_(other.compress_),
bookmark_count_(other.bookmark_count_) {
other.head_ = other.tail_ = nullptr;
other.len_ = other.count_ = 0;
}
current = head_;
len = len_;
while (len--) {
next = current->next;
QList::~QList() {
Clear();
}
QList& QList::operator=(QList&& other) {
if (this != &other) {
Clear();
head_ = other.head_;
tail_ = other.tail_;
len_ = other.len_;
count_ = other.count_;
fill_ = other.fill_;
compress_ = other.compress_;
bookmark_count_ = other.bookmark_count_;
other.head_ = other.tail_ = nullptr;
other.len_ = other.count_ = 0;
}
return *this;
}
void QList::Clear() {
quicklistNode* current = head_;
while (len_) {
quicklistNode* next = current->next;
zfree(current->entry);
count_ -= current->count;
zfree(current);
len_--;
current = next;
}
head_ = tail_ = nullptr;
count_ = 0;
}
void QList::Push(string_view value, Where where) {
@ -353,7 +381,7 @@ bool QList::Insert(std::string_view pivot, std::string_view elem, InsertOpt opt)
Iterator it = GetIterator(HEAD);
while (it.Next()) {
if (ElemCompare(it.Get(), pivot)) {
if (it.Get() == pivot) {
Insert(it, elem, opt);
return true;
}
@ -435,7 +463,6 @@ bool QList::PushTail(string_view value) {
} else {
quicklistNode* node = CreateNode();
node->entry = LP_FromElem(value);
NodeUpdateSz(node);
InsertNode(orig, node, AFTER);
}
@ -599,7 +626,63 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
}
void QList::Replace(Iterator it, std::string_view elem) {
// TODO
quicklistNode* node = it.current_;
unsigned char* newentry;
size_t sz = elem.size();
if (ABSL_PREDICT_TRUE(!QL_NODE_IS_PLAIN(node) && !IsLargeElement(sz, fill_) &&
(newentry = lpReplace(node->entry, &it.zi_, uint_ptr(elem), sz)) != NULL)) {
node->entry = newentry;
NodeUpdateSz(node);
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
quicklistCompress(node);
} else if (QL_NODE_IS_PLAIN(node)) {
if (IsLargeElement(sz, fill_)) {
zfree(node->entry);
node->entry = (uint8_t*)zmalloc(sz);
node->sz = sz;
memcpy(node->entry, elem.data(), sz);
quicklistCompress(node);
} else {
Insert(it, elem, AFTER);
DelNode(node);
}
} else { /* The node is full or data is a large element */
quicklistNode *split_node = NULL, *new_node;
node->dont_compress = 1; /* Prevent compression in InsertNode() */
/* If the entry is not at the tail, split the node at the entry's offset. */
if (it.offset_ != node->count - 1 && it.offset_ != -1)
split_node = SplitNode(node, it.offset_, 1);
/* Create a new node and insert it after the original node.
* If the original node was split, insert the split node after the new node. */
new_node = CreateNode(IsLargeElement(sz, fill_) ? QUICKLIST_NODE_CONTAINER_PLAIN
: QUICKLIST_NODE_CONTAINER_PACKED,
elem);
InsertNode(node, new_node, AFTER);
if (split_node)
InsertNode(new_node, split_node, AFTER);
count_++;
/* Delete the replaced element. */
if (node->count == 1) {
DelNode(node);
} else {
unsigned char* p = lpSeek(node->entry, -1);
DelPackedIndex(node, &p);
node->dont_compress = 0; /* Re-enable compression */
new_node = MergeNodes(new_node);
/* We can't know if the current node and its sibling nodes are correctly compressed,
* and we don't know if they are within the range of compress depth, so we need to
* use quicklistCompress() for compression, which checks if node is within compress
* depth before compressing. */
quicklistCompress(new_node);
quicklistCompress(new_node->prev);
if (new_node->next)
quicklistCompress(new_node->next);
}
}
}
/* Force 'quicklist' to meet compression guidelines set by compress depth.
@ -642,11 +725,11 @@ void QList::Compress(quicklistNode* node) {
}
if (!in_depth)
CompressNode(node);
CompressNodeIfNeeded(node);
/* At this point, forward and reverse are one node beyond depth */
CompressNode(forward);
CompressNode(reverse);
CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse);
}
/* Attempt to merge listpacks within two nodes on either side of 'center'.
@ -770,6 +853,32 @@ void QList::DelNode(quicklistNode* node) {
zfree(node);
}
/* Delete one entry from list given the node for the entry and a pointer
* to the entry in the node.
*
* Note: DelPackedIndex() *requires* uncompressed nodes because you
* already had to get *p from an uncompressed node somewhere.
*
* Returns 1 if the entire node was deleted, 0 if node still exists.
* Also updates in/out param 'p' with the next offset in the listpack. */
bool QList::DelPackedIndex(quicklistNode* node, uint8_t** p) {
DCHECK(!QL_NODE_IS_PLAIN(node));
bool gone = false;
node->entry = lpDelete(node->entry, *p, p);
node->count--;
if (node->count == 0) {
gone = true;
DelNode(node);
} else {
NodeUpdateSz(node);
}
count_--;
/* If we deleted the node, the original node is no longer valid */
return gone;
}
auto QList::GetIterator(Where where) const -> Iterator {
Iterator it;
it.owner_ = this;
@ -839,6 +948,55 @@ auto QList::GetIterator(long idx) const -> Iterator {
return iter;
}
auto QList::Erase(Iterator it) -> Iterator {
DCHECK(it.current_);
quicklistNode* node = it.current_;
quicklistNode* prev = node->prev;
quicklistNode* next = node->next;
bool deleted_node = false;
if (QL_NODE_IS_PLAIN(node)) {
DelNode(node);
deleted_node = true;
} else {
deleted_node = DelPackedIndex(node, &it.zi_);
}
/* after delete, the zi is now invalid for any future usage. */
it.zi_ = NULL;
/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (it.direction_ == AL_START_HEAD) {
it.current_ = next;
it.offset_ = 0;
} else if (it.direction_ == AL_START_TAIL) {
it.current_ = prev;
it.offset_ = -1;
}
}
/* else if (!deleted_node), no changes needed.
* we already reset iter->zi above, and the existing iter->offset
* doesn't move again because:
* - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
* - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
* if we deleted the last element at offset N and now
* length of this listpack is N-1, the next call into
* quicklistNext() will jump to the next node. */
return it;
}
bool QList::Entry::operator==(std::string_view sv) const {
if (std::holds_alternative<int64_t>(value_)) {
char buf[absl::numbers_internal::kFastToBufferSize];
char* end = absl::numbers_internal::FastIntToBuffer(std::get<int64_t>(value_), buf);
return sv == std::string_view(buf, end - buf);
}
return view() == sv;
}
bool QList::Iterator::Next() {
if (!current_)
return false;

View file

@ -11,6 +11,7 @@ extern "C" {
#include <functional>
#include <optional>
#include <string>
#include <variant>
namespace dfly {
@ -19,52 +20,35 @@ class QList {
enum Where { TAIL, HEAD };
// Provides wrapper around the references to the listpack entries.
struct Entry {
Entry(const char* value, size_t length) : value{value}, length{length} {
}
Entry(long long longval) : value{nullptr}, longval{longval} {
class Entry {
std::variant<std::string_view, int64_t> value_;
public:
Entry(const char* value, size_t length) : value_{std::string_view(value, length)} {
}
// Assumes value is not null.
explicit Entry(int64_t longval) : value_{longval} {
}
// Assumes value is not int64.
std::string_view view() const {
return {value, length};
return std::get<std::string_view>(value_);
}
const char* value;
union {
size_t length;
long long longval;
};
int64_t ival() const {
return std::get<int64_t>(value_);
}
bool operator==(std::string_view sv) const;
std::string to_string() const {
if (std::holds_alternative<int64_t>(value_)) {
return std::to_string(std::get<int64_t>(value_));
}
return std::string(view());
}
};
using IterateFunc = std::function<bool(Entry)>;
enum InsertOpt { BEFORE, AFTER };
QList();
QList(int fill, int compress);
QList(const QList&) = delete;
~QList();
QList& operator=(const QList&) = delete;
size_t Size() const {
return count_;
}
void Push(std::string_view value, Where where);
void AppendListpack(unsigned char* zl);
void AppendPlain(unsigned char* zl, size_t sz);
// Returns true if pivot found and elem inserted, false otherwise.
bool Insert(std::string_view pivot, std::string_view elem, InsertOpt opt);
// Returns true if item was replaced, false if index is out of range.
bool Replace(long index, std::string_view elem);
size_t MallocUsed() const;
void Iterate(IterateFunc cb, long start, long end) const;
class Iterator {
public:
Entry Get() const;
@ -82,6 +66,38 @@ class QList {
friend class QList;
};
using IterateFunc = std::function<bool(Entry)>;
enum InsertOpt { BEFORE, AFTER };
QList();
QList(int fill, int compress);
QList(QList&&);
QList(const QList&) = delete;
~QList();
QList& operator=(const QList&) = delete;
QList& operator=(QList&&);
size_t Size() const {
return count_;
}
void Clear();
void Push(std::string_view value, Where where);
void AppendListpack(unsigned char* zl);
void AppendPlain(unsigned char* zl, size_t sz);
// Returns true if pivot found and elem inserted, false otherwise.
bool Insert(std::string_view pivot, std::string_view elem, InsertOpt opt);
// Returns true if item was replaced, false if index is out of range.
bool Replace(long index, std::string_view elem);
size_t MallocUsed() const;
void Iterate(IterateFunc cb, long start, long end) const;
// Returns an iterator to tail or the head of the list.
// To mirror the quicklist interface, the iterator is not valid until Next() is called.
// TODO: to fix this.
@ -97,6 +113,8 @@ class QList {
return len_;
}
Iterator Erase(Iterator it);
private:
bool AllowCompression() const {
return compress_ != 0;
@ -118,6 +136,7 @@ class QList {
quicklistNode* ListpackMerge(quicklistNode* a, quicklistNode* b);
void DelNode(quicklistNode* node);
bool DelPackedIndex(quicklistNode* node, uint8_t** p);
quicklistNode* head_ = nullptr;
quicklistNode* tail_ = nullptr;

View file

@ -4,9 +4,11 @@
#include "core/qlist.h"
#include <absl/strings/str_cat.h>
#include <gmock/gmock.h>
#include "base/gtest.h"
#include "base/logging.h"
#include "core/mi_memory_resource.h"
extern "C" {
@ -30,6 +32,20 @@ class QListTest : public ::testing::Test {
init_zmalloc_threadlocal(tlh);
}
static void TearDownTestSuite() {
mi_heap_collect(mi_heap_get_backing(), true);
auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
LOG(ERROR) << "Unfreed allocations: block_size " << block_size
<< ", allocated: " << area->used * block_size;
return true;
};
mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit,
nullptr);
}
vector<string> ToItems() const;
MiMemoryResource mr_;
@ -39,7 +55,7 @@ class QListTest : public ::testing::Test {
vector<string> QListTest::ToItems() const {
vector<string> res;
auto cb = [&](const QList::Entry& e) {
res.push_back(e.value ? string(e.view()) : to_string(e.longval));
res.push_back(e.to_string());
return true;
};
@ -88,9 +104,11 @@ TEST_F(QListTest, ListPack) {
uint8_t* lp2 = lpAppend(lpNew(0), (uint8_t*)sv.data(), sv.size());
ASSERT_EQ(lpBytes(lp1), lpBytes(lp2));
ASSERT_EQ(0, memcmp(lp1, lp2, lpBytes(lp1)));
lpFree(lp1);
lpFree(lp2);
}
TEST_F(QListTest, Insert) {
TEST_F(QListTest, InsertDelete) {
EXPECT_FALSE(ql_.Insert("abc", "def", QList::BEFORE));
ql_.Push("abc", QList::HEAD);
EXPECT_TRUE(ql_.Insert("abc", "def", QList::BEFORE));
@ -99,6 +117,70 @@ TEST_F(QListTest, Insert) {
EXPECT_TRUE(ql_.Insert("abc", "123456", QList::AFTER));
items = ToItems();
EXPECT_THAT(items, ElementsAre("def", "abc", "123456"));
auto it = ql_.GetIterator(QList::HEAD);
ASSERT_TRUE(it.Next());
// Erase the items one by one.
it = ql_.Erase(it);
items = ToItems();
EXPECT_THAT(items, ElementsAre("abc", "123456"));
ASSERT_TRUE(it.Next());
ASSERT_EQ("abc", it.Get().view());
it = ql_.Erase(it);
items = ToItems();
EXPECT_THAT(items, ElementsAre("123456"));
ASSERT_TRUE(it.Next());
ASSERT_EQ(123456, it.Get().ival());
it = ql_.Erase(it);
items = ToItems();
EXPECT_THAT(items, ElementsAre());
ASSERT_FALSE(it.Next());
EXPECT_EQ(0, ql_.Size());
}
using FillCompress = tuple<int, unsigned>;
class PrintToFillCompress {
public:
std::string operator()(const TestParamInfo<FillCompress>& info) const {
int fill = get<0>(info.param);
int compress = get<1>(info.param);
string fill_str = fill >= 0 ? absl::StrCat("f", fill) : absl::StrCat("fminus", -fill);
return absl::StrCat(fill_str, "compress", compress);
}
};
class OptionsTest : public QListTest, public WithParamInterface<FillCompress> {};
INSTANTIATE_TEST_SUITE_P(Matrix, OptionsTest,
Combine(Values(-5, -4, -3, -2, -1, 0, 1, 2, 32, 66, 128, 999),
Values(0, 1, 2, 3, 4, 5, 6, 10)),
PrintToFillCompress());
TEST_P(OptionsTest, Numbers) {
auto [fill, compress] = GetParam();
ql_ = QList(fill, compress);
array<int64_t, 5000> nums;
for (unsigned i = 0; i < nums.size(); i++) {
nums[i] = -5157318210846258176 + i;
string val = absl::StrCat(nums[i]);
ql_.Push(val, QList::TAIL);
}
ql_.Push("xxxxxxxxxxxxxxxxxxxx", QList::TAIL);
for (unsigned i = 0; i < nums.size(); i++) {
auto it = ql_.GetIterator(i);
ASSERT_TRUE(it.Next());
ASSERT_EQ(nums[i], it.Get().ival()) << i;
}
auto it = ql_.GetIterator(nums.size());
ASSERT_TRUE(it.Next());
EXPECT_EQ("xxxxxxxxxxxxxxxxxxxx", it.Get().view());
}
}; // namespace dfly