1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: Add initial bindings for QList in list_family (#4093)

The feature is guarded by list_experimental_v2 flag, which is disabled.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-10 07:40:50 +02:00 committed by GitHub
parent 1819e51f78
commit 75c961e7ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 99 additions and 48 deletions

View file

@ -27,12 +27,11 @@ extern "C" {
#include "base/pod_array.h"
#include "core/bloom.h"
#include "core/detail/bitpacking.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
ABSL_RETIRED_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");
ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation.");
namespace dfly {
@ -67,6 +66,19 @@ inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
}
}
void FreeList(unsigned encoding, void* ptr, MemoryResource* mr) {
switch (encoding) {
case OBJ_ENCODING_QUICKLIST:
quicklistRelease((quicklist*)ptr);
break;
case kEncodingQL2:
CompactObj::DeleteMR<QList>(ptr);
break;
default:
LOG(FATAL) << "Unknown list encoding type";
}
}
size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2: {
@ -288,8 +300,9 @@ size_t RobjWrapper::MallocUsed() const {
CHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return InnerObjMallocUsed();
case OBJ_LIST:
DCHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return QlMAllocSize((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->MallocUsed();
case OBJ_SET:
return MallocUsedSet(encoding_, inner_obj_);
case OBJ_HASH:
@ -312,7 +325,9 @@ size_t RobjWrapper::Size() const {
DCHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return sz_;
case OBJ_LIST:
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return quicklistCount((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->Size();
case OBJ_ZSET: {
switch (encoding_) {
case OBJ_ENCODING_SKIPLIST: {
@ -367,8 +382,7 @@ void RobjWrapper::Free(MemoryResource* mr) {
mr->deallocate(inner_obj_, 0, 8); // we do not keep the allocated size.
break;
case OBJ_LIST:
CHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
quicklistRelease((quicklist*)inner_obj_);
FreeList(encoding_, inner_obj_, mr);
break;
case OBJ_SET:
FreeObjSet(encoding_, inner_obj_, mr);

View file

@ -19,8 +19,8 @@
namespace dfly {
constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingQL2 = 1;
constexpr unsigned kEncodingListPack = 3;
constexpr unsigned kEncodingJsonCons = 0;
constexpr unsigned kEncodingJsonFlat = 1;

View file

@ -14,6 +14,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/qlist.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
@ -59,6 +60,8 @@ ABSL_FLAG(int32_t, list_max_listpack_size, -2, "Maximum listpack size, default i
*/
ABSL_FLAG(int32_t, list_compress_depth, 0, "Compress depth of the list. Default is no compression");
ABSL_FLAG(bool, list_experimental_v2, false,
"Compress depth of the list. Default is no compression");
namespace dfly {
@ -73,6 +76,10 @@ quicklist* GetQL(const PrimeValue& mv) {
return (quicklist*)mv.RObjPtr();
}
QList* GetQLV2(const PrimeValue& mv) {
return (QList*)mv.RObjPtr();
}
void* listPopSaver(unsigned char* data, size_t sz) {
return new string((char*)data, sz);
}
@ -265,27 +272,47 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
res = std::move(*op_res);
}
quicklist* ql = nullptr;
size_t len = 0;
DVLOG(1) << "OpPush " << key << " new_key " << res.is_new;
quicklist* ql = nullptr;
QList* ql_v2 = nullptr;
if (res.is_new) {
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
ql_v2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, ql_v2);
} else {
ql = quicklistCreate();
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
}
} else {
if (res.it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
if (res.it->second.Encoding() == kEncodingQL2) {
ql_v2 = GetQLV2(res.it->second);
} else {
ql = GetQL(res.it->second);
}
}
if (ql) {
// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
for (string_view v : vals) {
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
}
len = quicklistCount(ql);
} else {
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
for (string_view v : vals) {
ql_v2->Push(v, where);
}
len = ql_v2->Size();
}
if (res.is_new) {
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
@ -305,7 +332,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
RecordJournal(op_args, command, mapped, 2);
}
return quicklistCount(ql);
return len;
}
OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
@ -320,26 +347,26 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
auto it = it_res->it;
quicklist* ql = GetQL(it->second);
auto prev_len = quicklistCount(ql);
StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
if (prev_len < count) {
count = prev_len;
}
res.reserve(count);
if (return_results) {
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
res.reserve(count);
}
} else {
for (unsigned i = 0; i < count; ++i) {
ListPop(dir, ql);
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
}
}
it_res->post_updater.Run();
if (quicklistCount(ql) == 0) {
if (count == prev_len) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}
@ -418,8 +445,12 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key) {
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
return ql->Size();
}
quicklist* ql = GetQL(res.value()->second);
return quicklistCount(ql);
}
@ -427,6 +458,15 @@ OpResult<string> OpIndex(const OpArgs& op_args, std::string_view key, long index
auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
string str;
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
auto it = ql->GetIterator(index);
if (!it.Next())
return OpStatus::KEY_NOTFOUND;
str = it.Get().to_string();
} else {
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
@ -434,7 +474,6 @@ OpResult<string> OpIndex(const OpArgs& op_args, std::string_view key, long index
return OpStatus::KEY_NOTFOUND;
quicklistNext(iter, &entry);
string str;
if (entry.value) {
str.assign(reinterpret_cast<char*>(entry.value), entry.sz);
@ -442,7 +481,7 @@ OpResult<string> OpIndex(const OpArgs& op_args, std::string_view key, long index
str = absl::StrCat(entry.longval);
}
quicklistReleaseIterator(iter);
}
return str;
}

View file

@ -178,7 +178,7 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_SET:
if (compact_enc == kEncodingIntSet)
return RDB_TYPE_SET_INTSET;
else if (compact_enc == kEncodingStrMap || compact_enc == kEncodingStrMap2) {
else if (compact_enc == kEncodingStrMap2) {
if (((StringSet*)pv.RObjPtr())->ExpirationUsed())
return RDB_TYPE_SET_WITH_EXPIRY;
else

View file

@ -26,8 +26,6 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/transaction.h"
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly {
using namespace facade;