1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: Remove robj reference from zset_family (#1554)

This is pure refactoring PR that does not change any functionality besides
prohibiting using AsRobj/SyncRobj functions for compact objects of type
OBJ_ZSET. This is needed in case we decide in the future to implement our own
zset type.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-07-17 09:34:12 +03:00 committed by GitHub
parent 3d6d9d99c7
commit 2c04311cc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 319 additions and 226 deletions

View file

@ -137,9 +137,7 @@ inline void FreeObjZset(unsigned encoding, void* ptr) {
switch (encoding) {
case OBJ_ENCODING_SKIPLIST:
zs = (zset*)ptr;
dictRelease(zs->dict);
zslFree(zs->zsl);
zfree(zs);
zsetFree(zs);
break;
case OBJ_ENCODING_LISTPACK:
zfree(ptr);
@ -361,6 +359,19 @@ bool RobjWrapper::DefragIfNeeded(float ratio) {
return false;
}
int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, double* newscore) {
robj self{.type = type_,
.encoding = encoding_,
.lru = 0,
.refcount = OBJ_STATIC_REFCOUNT,
.ptr = inner_obj_};
int res = zsetAdd(&self, score, ele, in_flags, out_flags, newscore);
inner_obj_ = self.ptr;
encoding_ = self.encoding;
return res;
}
bool RobjWrapper::Reallocate(MemoryResource* mr) {
void* old_ptr = inner_obj_;
inner_obj_ = mr->allocate(sz_, kAlignSize);
@ -531,6 +542,7 @@ unsigned CompactObj::Encoding() const {
void CompactObj::ImportRObj(robj* o) {
CHECK(1 == o->refcount || o->refcount == OBJ_STATIC_REFCOUNT);
CHECK_NE(o->encoding, OBJ_ENCODING_EMBSTR); // need regular one
CHECK_NE(o->type, OBJ_ZSET);
SetMeta(ROBJ_TAG);
@ -563,7 +575,7 @@ robj* CompactObj::AsRObj() const {
unsigned enc = u_.r_obj.encoding();
res->type = u_.r_obj.type();
if (res->type == OBJ_SET || res->type == OBJ_HASH) {
if (res->type == OBJ_SET || res->type == OBJ_HASH || res->type == OBJ_ZSET) {
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
}
@ -585,7 +597,8 @@ void CompactObj::SyncRObj() {
DCHECK_EQ(ROBJ_TAG, taglen_);
DCHECK_EQ(u_.r_obj.type(), obj->type);
CHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj";
DCHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj";
CHECK_NE(OBJ_ZSET, obj->type) << "zsets should be handled without robj";
unsigned enc = obj->encoding;
u_.r_obj.Init(obj->type, enc, obj->ptr);

View file

@ -58,12 +58,19 @@ class RobjWrapper {
return inner_obj_;
}
void set_inner_obj(void* ptr) {
inner_obj_ = ptr;
}
std::string_view AsView() const {
return std::string_view{reinterpret_cast<char*>(inner_obj_), sz_};
}
bool DefragIfNeeded(float ratio);
// as defined in zset.h
int ZsetAdd(double score, char* ele, int in_flags, int* out_flags, double* newscore);
private:
bool Reallocate(MemoryResource* mr);
size_t InnerObjMallocUsed() const;
@ -94,7 +101,13 @@ class CompactObj {
CompactObj(const CompactObj&) = delete;
// 0-16 is reserved for inline lengths of string type.
enum TagEnum { INT_TAG = 17, SMALL_TAG = 18, ROBJ_TAG = 19, EXTERNAL_TAG = 20, JSON_TAG = 21 };
enum TagEnum {
INT_TAG = 17,
SMALL_TAG = 18,
ROBJ_TAG = 19,
EXTERNAL_TAG = 20,
JSON_TAG = 21,
};
enum MaskBit {
REF_BIT = 1,
@ -245,7 +258,7 @@ class CompactObj {
robj* AsRObj() const;
// takes ownership over obj.
// takes ownership over obj_inner.
// type should not be OBJ_STRING.
void InitRobj(unsigned type, unsigned encoding, void* obj_inner);
@ -257,6 +270,15 @@ class CompactObj {
void SetInt(int64_t val);
std::optional<int64_t> TryGetInt() const;
// We temporary expose this function to avoid passing around robj objects.
detail::RobjWrapper* GetRobjWrapper() {
return &u_.r_obj;
}
const detail::RobjWrapper* GetRobjWrapper() const {
return &u_.r_obj;
}
// For STR object.
void SetString(std::string_view str);
void GetString(std::string* res) const;

View file

@ -377,8 +377,7 @@ TEST_F(CompactObjectTest, ZSet) {
"minstring";
EXPECT_EQ(9, sdslen(kMinStrData + 1));
robj* src = createZsetListpackObject();
cobj_.ImportRObj(src);
cobj_.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lpNew(0));
EXPECT_EQ(OBJ_ZSET, cobj_.ObjType());
EXPECT_EQ(OBJ_ENCODING_LISTPACK, cobj_.Encoding());

View file

@ -269,24 +269,6 @@ robj *createHashObject(void) {
return o;
}
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
robj *createZsetListpackObject(void) {
unsigned char *lp = lpNew(0);
robj *o = createObject(OBJ_ZSET,lp);
o->encoding = OBJ_ENCODING_LISTPACK;
return o;
}
robj *createStreamObject(void) {
stream *s = streamNew();
robj *o = createObject(OBJ_STREAM,s);

View file

@ -96,8 +96,6 @@ robj *createQuicklistObject(void);
robj *createSetObject(void);
robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
robj *createZsetListpackObject(void);
unsigned long long estimateObjectIdleTime(const robj *o);
uint8_t LFUDecrAndReturn(time_t epoch_sec, const robj *o);
void listTypeConvert(robj *subject, int enc);

View file

@ -1123,6 +1123,19 @@ unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsig
* Common sorted set API
*----------------------------------------------------------------------------*/
zset* zsetCreate(void) {
zset *zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType);
zs->zsl = zslCreate();
return zs;
}
void zsetFree(zset *zs) {
dictRelease(zs->dict);
zslFree(zs->zsl);
zfree(zs);
}
unsigned long zsetLength(const robj *zobj) {
unsigned long length = 0;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
@ -1423,7 +1436,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou
* returning 1 if the element existed and was deleted, 0 otherwise (the
* element was not there). It does not resize the dict after deleting the
* element. */
static int zsetRemoveFromSkiplist(zset *zs, sds ele) {
int zsetRemoveFromSkiplist(zset *zs, sds ele) {
dictEntry *de;
double score;
@ -1536,58 +1549,3 @@ long zsetRank(robj *zobj, sds ele, int reverse) {
serverPanic("Unknown sorted set encoding");
}
}
/* This is a helper function for the COPY command.
* Duplicate a sorted set object, with the guarantee that the returned object
* has the same encoding as the original one.
*
* The resulting object always has refcount set to 1 */
robj *zsetDup(robj *o) {
robj *zobj;
zset *zs;
zset *new_zs;
serverAssert(o->type == OBJ_ZSET);
/* Create a new sorted set object that have the same encoding as the original object's encoding */
if (o->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *zl = o->ptr;
size_t sz = lpBytes(zl);
unsigned char *new_zl = zmalloc(sz);
memcpy(new_zl, zl, sz);
zobj = createObject(OBJ_ZSET, new_zl);
zobj->encoding = OBJ_ENCODING_LISTPACK;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zobj = createZsetObject();
zs = o->ptr;
new_zs = zobj->ptr;
dictExpand(new_zs->dict,dictSize(zs->dict));
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;
long llen = zsetLength(o);
/* We copy the skiplist elements from the greatest to the
* smallest (that's trivial since the elements are already ordered in
* the skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
ln = zsl->tail;
while (llen--) {
ele = ln->ele;
sds new_ele = sdsdup(ele);
zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele);
dictAdd(new_zs->dict,new_ele,&znode->score);
ln = ln->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return zobj;
}
/* Create a new sds string from the listpack entry. */
sds zsetSdsFromListpackEntry(listpackEntry *e) {
return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval);
}

View file

@ -69,9 +69,13 @@ zskiplistNode* zslLastInRange(zskiplist* zsl, const zrangespec* range);
// double zzlGetScore(unsigned char *sptr);
// void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
// void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned char *zzlFind(unsigned char *lp, sds ele, double *score);
unsigned char* zzlFirstInRange(unsigned char* zl, const zrangespec* range);
unsigned char* zzlLastInRange(unsigned char* zl, const zrangespec* range);
zset* zsetCreate(void);
void zsetFree(zset *o);
unsigned long zsetLength(const robj* zobj);
int zsetRemoveFromSkiplist(zset *zs, sds ele);
void zsetConvert(robj* zobj, int encoding);
void zsetConvertToZiplistIfNeeded(robj* zobj, size_t maxelelen);
int zsetScore(robj* zobj, sds member, double* score);

View file

@ -90,16 +90,16 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func) {
return success;
}
bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, int32_t end,
bool reverse, bool use_score) {
unsigned long llen = zsetLength(zobj);
bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func,
int32_t start, int32_t end, bool reverse, bool use_score) {
unsigned long llen = robj_wrapper->Size();
if (end < 0 || unsigned(end) >= llen)
end = llen - 1;
unsigned rangelen = unsigned(end - start) + 1;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = static_cast<uint8_t*>(zobj->ptr);
if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = static_cast<uint8_t*>(robj_wrapper->inner_obj());
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen;
@ -138,15 +138,15 @@ bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start,
}
return success;
} else {
CHECK_EQ(zobj->encoding, OBJ_ENCODING_SKIPLIST);
zset* zs = static_cast<zset*>(zobj->ptr);
CHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST);
zset* zs = static_cast<zset*>(robj_wrapper->inner_obj());
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
unsigned long llen = zsetLength(zobj);
unsigned long llen = robj_wrapper->Size();
if (start > 0)
ln = zslGetElementByRank(zsl, llen - start);
} else {

View file

@ -69,8 +69,9 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func);
// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
// without stopping.
bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0,
int32_t end = -1, bool reverse = false, bool use_score = false);
bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func,
int32_t start = 0, int32_t end = -1, bool reverse = false,
bool use_score = false);
// Get StringMap pointer from primetable value. Sets expire time from db_context
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context);

View file

@ -923,7 +923,8 @@ template <typename F> bool Iterate(const PrimeValue& pv, F&& func) {
return container_utils::IterateSet(pv, cb);
case OBJ_ZSET:
return container_utils::IterateSortedSet(
pv.AsRObj(), [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); });
pv.GetRobjWrapper(),
[&cb](container_utils::ContainerEntry ce, double) { return cb(ce); });
default:
return false;
}

View file

@ -69,6 +69,76 @@ inline void YieldIfNeeded(size_t i) {
}
}
// taken from zset.c
unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) {
unsigned char* sptr;
char scorebuf[128];
int scorelen;
scorelen = d2string(scorebuf, sizeof(scorebuf), score);
if (eptr == NULL) {
zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele));
zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr);
/* Insert score after the member. */
zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL);
}
return zl;
}
// taken from zset.c
uint8_t* ToListPack(const zskiplist* zsl) {
uint8_t* lp = lpNew(0);
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the listpack. */
zskiplistNode* node = zsl->header->level[0].forward;
while (node) {
lp = zzlInsertAt(lp, NULL, node->ele, node->score);
node = node->level[0].forward;
}
return lp;
}
// taken from zsetConvert
zset* FromListPack(const uint8_t* lp) {
uint8_t* zl = (uint8_t*)lp;
unsigned char *eptr, *sptr;
unsigned char* vstr;
unsigned int vlen;
long long vlong;
sds ele;
eptr = lpSeek(zl, 0);
if (eptr != NULL) {
sptr = lpNext(zl, eptr);
CHECK(sptr != NULL);
}
zset* zs = zsetCreate();
while (eptr != NULL) {
double score = zzlGetScore(sptr);
vstr = lpGetValue(eptr, &vlen, &vlong);
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen((char*)vstr, vlen);
zskiplistNode* node = zslInsert(zs->zsl, score, ele);
CHECK_EQ(DICT_OK, dictAdd(zs->dict, ele, &node->score));
zzlNext(zl, &eptr, &sptr);
}
return zs;
}
class error_category : public std::error_category {
public:
const char* name() const noexcept final {
@ -693,10 +763,8 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
}
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
robj* res = createZsetObject();
zset* zs = (zset*)res->ptr;
auto cleanup = absl::Cleanup([&] { decrRefCount(res); });
zset* zs = zsetCreate();
auto cleanup = absl::Cleanup([&] { zsetFree(zs); });
size_t zsetlen = ltrace->blob_count();
if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) {
@ -734,15 +802,18 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
if (ec_)
return;
/* Convert *after* loading, since sorted sets are not stored ordered. */
if (zsetLength(res) <= server.zset_max_listpack_entries &&
unsigned enc = OBJ_ENCODING_SKIPLIST;
void* inner = zs;
if (zs->zsl->length <= server.zset_max_listpack_entries &&
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
zsetConvert(res, OBJ_ENCODING_LISTPACK);
enc = OBJ_ENCODING_LISTPACK;
inner = ToListPack(zs->zsl);
} else {
std::move(cleanup).Cancel();
}
std::move(cleanup).Cancel();
pv_->ImportRObj(res);
pv_->InitRobj(OBJ_ZSET, enc, inner);
}
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
@ -866,7 +937,6 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
return;
}
robj* res = nullptr;
if (rdb_type_ == RDB_TYPE_SET_INTSET) {
if (!intsetValidateIntegrity((const uint8_t*)blob.data(), blob.size(), 0)) {
LOG(ERROR) << "Intset integrity check failed.";
@ -877,6 +947,8 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
const intset* is = (const intset*)blob.data();
unsigned len = intsetLen(is);
robj* res = nullptr;
if (len > SetFamily::MaxIntsetEntries()) {
res = createSetObject();
if (!SetFamily::ConvertToStrSet(is, len, res)) {
@ -891,6 +963,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
res = createObject(OBJ_SET, mine);
res->encoding = OBJ_ENCODING_INTSET;
}
pv_->ImportRObj(res);
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST) {
unsigned char* lp = lpNew(blob.size());
if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(), &lp)) {
@ -930,18 +1003,20 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
return;
}
res = createObject(OBJ_ZSET, lp);
res->encoding = OBJ_ENCODING_LISTPACK;
if (lpBytes(lp) > server.zset_max_listpack_entries)
zsetConvert(res, OBJ_ENCODING_SKIPLIST);
else
res->ptr = lpShrinkToFit(lp);
unsigned encoding = OBJ_ENCODING_LISTPACK;
void* inner;
if (lpBytes(lp) > server.zset_max_listpack_entries) {
inner = FromListPack(lp);
zfree(lp);
encoding = OBJ_ENCODING_SKIPLIST;
} else {
inner = lpShrinkToFit(lp);
}
pv_->InitRobj(OBJ_ZSET, encoding, inner);
return;
} else {
LOG(FATAL) << "Unsupported rdb type " << rdb_type_;
}
pv_->ImportRObj(res);
}
sds RdbLoaderBase::OpaqueObjLoader::ToSds(const RdbVariant& obj) {

View file

@ -321,7 +321,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
}
if (obj_type == OBJ_ZSET) {
return SaveZSetObject(pv.AsRObj());
return SaveZSetObject(pv);
}
if (obj_type == OBJ_STREAM) {
@ -441,10 +441,11 @@ error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) {
return error_code{};
}
error_code RdbSerializer::SaveZSetObject(const robj* obj) {
DCHECK_EQ(OBJ_ZSET, obj->type);
if (obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset* zs = (zset*)obj->ptr;
error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) {
DCHECK_EQ(OBJ_ZSET, pv.ObjType());
const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper();
if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) {
zset* zs = (zset*)robj_wrapper->inner_obj();
zskiplist* zsl = zs->zsl;
RETURN_ON_ERR(SaveLen(zsl->length));
@ -462,8 +463,8 @@ error_code RdbSerializer::SaveZSetObject(const robj* obj) {
zn = zn->backward;
}
} else {
CHECK_EQ(obj->encoding, unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding";
uint8_t* lp = (uint8_t*)obj->ptr;
CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding";
uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj();
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
}

View file

@ -168,7 +168,7 @@ class RdbSerializer {
std::error_code SaveListObject(const robj* obj);
std::error_code SaveSetObject(const PrimeValue& pv);
std::error_code SaveHSetObject(const PrimeValue& pv);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveZSetObject(const PrimeValue& pv);
std::error_code SaveStreamObject(const robj* obj);
std::error_code SaveJsonObject(const PrimeValue& pv);

View file

@ -9,6 +9,7 @@ extern "C" {
#include "redis/geohash_helper.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
#include "redis/zset.h"
}
@ -82,6 +83,53 @@ zlexrangespec GetLexRange(bool reverse, const ZSetFamily::LexInterval& li) {
return range;
}
/* Delete the element 'ele' from the sorted set, returning 1 if the element
* existed and was deleted, 0 otherwise (the element was not there).
* taken from t_zset.c
*/
int ZsetDel(detail::RobjWrapper* robj_wrapper, sds ele) {
if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
unsigned char* eptr;
uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj();
if ((eptr = zzlFind(lp, ele, NULL)) != NULL) {
lp = lpDeleteRangeWithEntry(lp, &eptr, 2);
robj_wrapper->set_inner_obj(lp);
return 1;
}
} else if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) {
zset* zs = (zset*)robj_wrapper->inner_obj();
if (zsetRemoveFromSkiplist(zs, ele)) {
if (htNeedsResize(zs->dict))
dictResize(zs->dict);
return 1;
}
}
return 0; /* No such element found. */
}
// taken from t_zset.c
std::optional<double> GetZsetScore(detail::RobjWrapper* robj_wrapper, sds member) {
if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
double score;
if (zzlFind((uint8_t*)robj_wrapper->inner_obj(), member, &score) == NULL)
return std::nullopt;
return score;
}
if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) {
zset* zs = (zset*)robj_wrapper->inner_obj();
dictEntry* de = dictFind(zs->dict, member);
if (de == NULL)
return std::nullopt;
return *(double*)dictGetVal(de);
}
LOG(FATAL) << "Unknown sorted set encoding";
return 0;
}
struct ZParams {
unsigned flags = 0; // mask of ZADD_IN_ macros.
bool ch = false; // Corresponds to CH option.
@ -104,20 +152,20 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
}
PrimeIterator& it = add_res.first;
if (add_res.second || zparams.override) {
robj* zobj = nullptr;
PrimeValue& pv = it->second;
if (add_res.second || zparams.override) {
if (member_len > kMaxListPackValue) {
zobj = createZsetObject();
zset* zs = zsetCreate();
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs);
} else {
zobj = createZsetListpackObject();
unsigned char* lp = lpNew(0);
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
}
DVLOG(2) << "Created zset " << zobj->ptr;
if (!add_res.second) {
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
it->second.ImportRObj(zobj);
} else {
if (it->second.ObjType() != OBJ_ZSET)
return OpStatus::WRONG_TYPE;
@ -176,8 +224,8 @@ enum class Action { RANGE = 0, REMOVE = 1, POP = 2 };
class IntervalVisitor {
public:
IntervalVisitor(Action action, const ZSetFamily::RangeParams& params, robj* o)
: action_(action), params_(params), zobj_(o) {
IntervalVisitor(Action action, const ZSetFamily::RangeParams& params, PrimeValue* pv)
: action_(action), params_(params), robj_wrapper_(pv->GetRobjWrapper()) {
}
void operator()(const ZSetFamily::IndexInterval& ii);
@ -236,14 +284,14 @@ class IntervalVisitor {
Action action_;
ZSetFamily::RangeParams params_;
robj* zobj_;
detail::RobjWrapper* robj_wrapper_;
ZSetFamily::ScoredArray result_;
unsigned removed_ = 0;
};
void IntervalVisitor::operator()(const ZSetFamily::IndexInterval& ii) {
unsigned long llen = zsetLength(zobj_);
unsigned long llen = robj_wrapper_->Size();
int32_t start = ii.first;
int32_t end = ii.second;
@ -322,7 +370,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
end = static_cast<uint32_t>(min(1ULL * start + params_.limit - 1, 1ULL * end));
container_utils::IterateSortedSet(
zobj_,
robj_wrapper_,
[this](container_utils::ContainerEntry ce, double score) {
result_.emplace_back(ce.ToString(), score);
return true;
@ -331,76 +379,76 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) {
}
void IntervalVisitor::ActionRange(const zrangespec& range) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
ExtractListPack(range);
} else {
CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST);
CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST);
ExtractSkipList(range);
}
}
void IntervalVisitor::ActionRange(const zlexrangespec& range) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
ExtractListPack(range);
} else {
CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST);
CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST);
ExtractSkipList(range);
}
}
void IntervalVisitor::ActionRem(unsigned start, unsigned end) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
removed_ = (end - start) + 1;
zl = lpDeleteRange(zl, 2 * start, 2 * removed_);
zobj_->ptr = zl;
robj_wrapper_->set_inner_obj(zl);
} else {
CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding);
zset* zs = (zset*)zobj_->ptr;
CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding());
zset* zs = (zset*)robj_wrapper_->inner_obj();
removed_ = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict);
}
}
void IntervalVisitor::ActionRem(const zrangespec& range) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
unsigned long deleted = 0;
zl = zzlDeleteRangeByScore(zl, &range, &deleted);
zobj_->ptr = zl;
robj_wrapper_->set_inner_obj(zl);
removed_ = deleted;
} else {
CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding);
zset* zs = (zset*)zobj_->ptr;
CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding());
zset* zs = (zset*)robj_wrapper_->inner_obj();
removed_ = zslDeleteRangeByScore(zs->zsl, &range, zs->dict);
}
}
void IntervalVisitor::ActionRem(const zlexrangespec& range) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
unsigned long deleted = 0;
zl = zzlDeleteRangeByLex(zl, &range, &deleted);
zobj_->ptr = zl;
robj_wrapper_->set_inner_obj(zl);
removed_ = deleted;
} else {
CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding);
zset* zs = (zset*)zobj_->ptr;
CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding());
zset* zs = (zset*)robj_wrapper_->inner_obj();
removed_ = zslDeleteRangeByLex(zs->zsl, &range, zs->dict);
}
}
void IntervalVisitor::ActionPop(ZSetFamily::TopNScored sc) {
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) {
PopListPack(sc);
} else {
CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST);
CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST);
PopSkipList(sc);
}
}
void IntervalVisitor::ExtractListPack(const zrangespec& range) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen = 0;
@ -444,7 +492,7 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) {
}
void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
zset* zs = (zset*)zobj_->ptr;
zset* zs = (zset*)robj_wrapper_->inner_obj();
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
unsigned offset = params_.offset;
@ -476,7 +524,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) {
}
void IntervalVisitor::ExtractListPack(const zlexrangespec& range) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
uint8_t *eptr, *sptr = nullptr;
uint8_t* vstr = nullptr;
unsigned int vlen = 0;
@ -524,7 +572,7 @@ void IntervalVisitor::ExtractListPack(const zlexrangespec& range) {
}
void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) {
zset* zs = (zset*)zobj_->ptr;
zset* zs = (zset*)robj_wrapper_->inner_obj();
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
unsigned offset = params_.offset;
@ -561,7 +609,7 @@ void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) {
}
void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj();
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen = 0;
@ -596,11 +644,11 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) {
}
/* We can finally delete the elements */
zobj_->ptr = lpDeleteRange(zl, start, 2 * sc);
robj_wrapper_->set_inner_obj(lpDeleteRange(zl, start, 2 * sc));
}
void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) {
zset* zs = (zset*)zobj_->ptr;
zset* zs = (zset*)robj_wrapper_->inner_obj();
zskiplist* zsl = zs->zsl;
zskiplistNode* ln;
@ -615,7 +663,7 @@ void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) {
result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score);
/* we can delete the element now */
zsetDel(zobj_, ln->ele);
ZsetDel(robj_wrapper_, ln->ele);
ln = Next(ln);
}
@ -687,11 +735,10 @@ void SendAtLeastOneKeyError(ConnectionContext* cntx) {
enum class AggType : uint8_t { SUM, MIN, MAX, NOOP };
using ScoredMap = absl::flat_hash_map<std::string, double>;
ScoredMap FromObject(const CompactObj& co, double weight) {
robj* obj = co.AsRObj();
ScoredMap FromObject(CompactObj& co, double weight) {
ZSetFamily::RangeParams params;
params.with_scores = true;
IntervalVisitor vis(Action::RANGE, params, obj);
IntervalVisitor vis(Action::RANGE, params, &co);
vis(ZSetFamily::IndexInterval(0, -1));
ZSetFamily::ScoredArray arr = vis.PopResult();
@ -935,8 +982,6 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
unsigned added = 0;
unsigned updated = 0;
unsigned processed = 0;
@ -947,12 +992,12 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
OpStatus op_status = OpStatus::OK;
AddResult aresult;
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
for (size_t j = 0; j < members.size(); j++) {
const auto& m = members[j];
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
int retval = zsetAdd(zobj, m.first, tmp_str, zparams.flags, &retflags, &new_score);
int retval = robj_wrapper->ZsetAdd(m.first, tmp_str, zparams.flags, &retflags, &new_score);
if (zparams.flags & ZADD_IN_INCR) {
if (retval == 0) {
@ -975,9 +1020,6 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
processed++;
}
DVLOG(2) << "ZAdd " << zobj->ptr;
res_it.value()->second.SyncRObj();
op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key);
if (zparams.flags & ZADD_IN_INCR) {
@ -1191,15 +1233,14 @@ ZSetFamily::ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_
DVLOG(2) << "popping from " << key << " " << t->DebugId();
db_slice.PreUpdate(t->GetDbIndex(), it);
robj* zobj = it_res.value()->second.AsRObj();
IntervalVisitor iv{Action::POP, range_spec.params, zobj};
PrimeValue& pv = it->second;
IntervalVisitor iv{Action::POP, range_spec.params, &pv};
std::visit(iv, range_spec.interval);
it_res.value()->second.SyncRObj();
db_slice.PostUpdate(t->GetDbIndex(), *it_res, key);
auto zlen = zsetLength(zobj);
auto zlen = pv.Size();
if (zlen == 0) {
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
CHECK(db_slice.Del(t->GetDbIndex(), *it_res));
@ -1292,15 +1333,14 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args,
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
PrimeValue& pv = res_it.value()->second;
IntervalVisitor iv{Action::POP, range_spec.params, zobj};
IntervalVisitor iv{Action::POP, range_spec.params, &pv};
std::visit(iv, range_spec.interval);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
auto zlen = zsetLength(zobj);
auto zlen = pv.Size();
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
}
@ -1314,8 +1354,8 @@ auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, st
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
IntervalVisitor iv{Action::RANGE, range_spec.params, zobj};
PrimeValue& pv = res_it.value()->second;
IntervalVisitor iv{Action::RANGE, range_spec.params, &pv};
std::visit(iv, range_spec.interval);
@ -1331,15 +1371,13 @@ OpResult<unsigned> OpRemRange(const OpArgs& op_args, string_view key,
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj};
PrimeValue& pv = res_it.value()->second;
IntervalVisitor iv{Action::REMOVE, range_spec.params, &pv};
std::visit(iv, range_spec.interval);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
auto zlen = zsetLength(zobj);
auto zlen = pv.Size();
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
}
@ -1353,10 +1391,17 @@ OpResult<unsigned> OpRank(const OpArgs& op_args, string_view key, string_view me
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size());
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
robj self{
.type = OBJ_ZSET,
.encoding = robj_wrapper->encoding(),
.lru = 0,
.refcount = OBJ_STATIC_REFCOUNT,
.ptr = robj_wrapper->inner_obj(),
};
long res = zsetRank(zobj, op_args.shard->tmp_str1, reverse);
long res = zsetRank(&self, op_args.shard->tmp_str1, reverse);
if (res < 0)
return OpStatus::KEY_NOTFOUND;
return res;
@ -1368,12 +1413,12 @@ OpResult<unsigned> OpCount(const OpArgs& op_args, std::string_view key,
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
zrangespec range = GetZrangeSpec(false, interval);
unsigned count = 0;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj->ptr;
if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj();
uint8_t *eptr, *sptr;
double score;
@ -1404,8 +1449,8 @@ OpResult<unsigned> OpCount(const OpArgs& op_args, std::string_view key,
}
}
} else {
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
zset* zs = (zset*)zobj->ptr;
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), robj_wrapper->encoding());
zset* zs = (zset*)robj_wrapper->inner_obj();
zskiplist* zsl = zs->zsl;
zskiplistNode* zn;
unsigned long rank;
@ -1439,11 +1484,12 @@ OpResult<unsigned> OpLexCount(const OpArgs& op_args, string_view key,
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
zlexrangespec range = GetLexRange(false, interval);
unsigned count = 0;
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj->ptr;
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj();
uint8_t *eptr, *sptr;
/* Use the first element in range as the starting point */
@ -1453,7 +1499,7 @@ OpResult<unsigned> OpLexCount(const OpArgs& op_args, string_view key,
if (eptr) {
/* First element is in range */
sptr = lpNext(zl, eptr);
serverAssertWithInfo(c, zobj, zzlLexValueLteMax(eptr, &range));
serverAssertWithInfo(c, robj_wrapper, zzlLexValueLteMax(eptr, &range));
/* Iterate over elements in range */
while (eptr) {
@ -1467,8 +1513,8 @@ OpResult<unsigned> OpLexCount(const OpArgs& op_args, string_view key,
}
}
} else {
DCHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj->encoding);
zset* zs = (zset*)zobj->ptr;
DCHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper->encoding());
zset* zs = (zset*)robj_wrapper->inner_obj();
zskiplist* zsl = zs->zsl;
zskiplistNode* zn;
unsigned long rank;
@ -1503,15 +1549,14 @@ OpResult<unsigned> OpRem(const OpArgs& op_args, string_view key, ArgSlice member
return res_it.status();
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
sds& tmp_str = op_args.shard->tmp_str1;
unsigned deleted = 0;
for (string_view member : members) {
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
deleted += zsetDel(zobj, tmp_str);
deleted += ZsetDel(robj_wrapper, tmp_str);
}
auto zlen = zsetLength(zobj);
res_it.value()->second.SyncRObj();
auto zlen = robj_wrapper->Size();
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
if (zlen == 0) {
@ -1526,15 +1571,15 @@ OpResult<double> OpScore(const OpArgs& op_args, string_view key, string_view mem
if (!res_it)
return res_it.status();
robj* zobj = res_it.value()->second.AsRObj();
PrimeValue& pv = res_it.value()->second;
sds& tmp_str = op_args.shard->tmp_str1;
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
double score;
int retval = zsetScore(zobj, tmp_str, &score);
if (retval != C_OK) {
detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper();
auto res = GetZsetScore(robj_wrapper, tmp_str);
if (!res)
return OpStatus::KEY_NOTFOUND;
}
return score;
return *res;
}
OpResult<MScoreResponse> OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) {
@ -1544,20 +1589,14 @@ OpResult<MScoreResponse> OpMScore(const OpArgs& op_args, string_view key, ArgSli
MScoreResponse scores(members.size());
robj* zobj = res_it.value()->second.AsRObj();
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
sds& tmp_str = op_args.shard->tmp_str1;
for (size_t i = 0; i < members.size(); i++) {
const auto& m = members[i];
tmp_str = sdscpylen(tmp_str, m.data(), m.size());
double score;
int retval = zsetScore(zobj, tmp_str, &score);
if (retval == C_OK) {
scores[i] = score;
} else {
scores[i] = std::nullopt;
}
scores[i] = GetZsetScore(robj_wrapper, tmp_str);
}
return scores;
@ -1571,14 +1610,14 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
return find_res.status();
PrimeIterator it = find_res.value();
PrimeValue& pv = it->second;
StringVec res;
robj* zobj = it->second.AsRObj();
char buf[128];
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
if (pv.Encoding() == OBJ_ENCODING_LISTPACK) {
ZSetFamily::RangeParams params;
params.with_scores = true;
IntervalVisitor iv{Action::RANGE, params, zobj};
IntervalVisitor iv{Action::RANGE, params, &pv};
iv(ZSetFamily::IndexInterval{0, kuint32max});
ZSetFamily::ScoredArray arr = iv.PopResult();
@ -1593,9 +1632,9 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
}
*cursor = 0;
} else {
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), pv.Encoding());
uint32_t count = scan_op.limit;
zset* zs = (zset*)zobj->ptr;
zset* zs = (zset*)pv.RObjPtr();
dict* ht = zs->dict;
long maxiterations = count * 10;
@ -1749,7 +1788,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) {
return find_res.status();
}
return zsetLength(find_res.value()->second.AsRObj());
return find_res.value()->second.Size();
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));