mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): Account for RObj concrete objects (#2324)
* feat(server): Account for RObj concrete objects * proper `delete`
This commit is contained in:
parent
4929e3c777
commit
34986fc52e
8 changed files with 37 additions and 29 deletions
|
@ -56,7 +56,7 @@ inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
|
|||
break;
|
||||
}
|
||||
case kEncodingStrMap2: {
|
||||
delete (StringSet*)ptr;
|
||||
CompactObj::DeleteMR<StringSet>(ptr);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
|
|||
return 0; // TODO
|
||||
case kEncodingStrMap2: {
|
||||
StringSet* ss = (StringSet*)ptr;
|
||||
return ss->ObjMallocUsed() + ss->SetMallocUsed();
|
||||
return ss->ObjMallocUsed() + ss->SetMallocUsed() + zmalloc_usable_size(ptr);
|
||||
}
|
||||
case kEncodingIntSet:
|
||||
return intsetBlobLen((intset*)ptr);
|
||||
|
@ -90,7 +90,7 @@ size_t MallocUsedHSet(unsigned encoding, void* ptr) {
|
|||
return zmalloc_usable_size(reinterpret_cast<uint8_t*>(ptr));
|
||||
case kEncodingStrMap2: {
|
||||
StringMap* sm = (StringMap*)ptr;
|
||||
return sm->ObjMallocUsed() + sm->SetMallocUsed();
|
||||
return sm->ObjMallocUsed() + sm->SetMallocUsed() + zmalloc_usable_size(ptr);
|
||||
}
|
||||
}
|
||||
LOG(DFATAL) << "Unknown set encoding type " << encoding;
|
||||
|
@ -103,7 +103,7 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
|
|||
return zmalloc_usable_size(reinterpret_cast<uint8_t*>(ptr));
|
||||
case OBJ_ENCODING_SKIPLIST: {
|
||||
detail::SortedMap* ss = (detail::SortedMap*)ptr;
|
||||
return ss->MallocSize(); // DictMallocSize(zs->dict);
|
||||
return ss->MallocSize() + zmalloc_usable_size(ptr); // DictMallocSize(zs->dict);
|
||||
}
|
||||
}
|
||||
LOG(DFATAL) << "Unknown set encoding type " << encoding;
|
||||
|
@ -118,7 +118,7 @@ size_t MallocUsedStream(unsigned encoding, void* streamv) {
|
|||
inline void FreeObjHash(unsigned encoding, void* ptr) {
|
||||
switch (encoding) {
|
||||
case kEncodingStrMap2:
|
||||
delete ((StringMap*)ptr);
|
||||
CompactObj::DeleteMR<StringMap>(ptr);
|
||||
break;
|
||||
case kEncodingListPack:
|
||||
lpFree((uint8_t*)ptr);
|
||||
|
@ -129,10 +129,9 @@ inline void FreeObjHash(unsigned encoding, void* ptr) {
|
|||
}
|
||||
|
||||
inline void FreeObjZset(unsigned encoding, void* ptr) {
|
||||
detail::SortedMap* zs = (detail::SortedMap*)ptr;
|
||||
switch (encoding) {
|
||||
case OBJ_ENCODING_SKIPLIST:
|
||||
delete zs;
|
||||
CompactObj::DeleteMR<detail::SortedMap>(ptr);
|
||||
break;
|
||||
case OBJ_ENCODING_LISTPACK:
|
||||
zfree(ptr);
|
||||
|
@ -472,9 +471,8 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do
|
|||
* becomes too long *before* executing zzlInsert. */
|
||||
if (zl_len >= server.zset_max_listpack_entries ||
|
||||
sdslen(ele) > server.zset_max_listpack_value) {
|
||||
unique_ptr<SortedMap> ss = SortedMap::FromListPack(tl.local_mr, lp);
|
||||
inner_obj_ = SortedMap::FromListPack(tl.local_mr, lp);
|
||||
lpFree(lp);
|
||||
inner_obj_ = ss.release();
|
||||
encoding_ = OBJ_ENCODING_SKIPLIST;
|
||||
} else {
|
||||
lp = detail::ZzlInsert(lp, ele, score);
|
||||
|
|
|
@ -324,6 +324,17 @@ class CompactObj {
|
|||
static void InitThreadLocal(MemoryResource* mr);
|
||||
static MemoryResource* memory_resource(); // thread-local.
|
||||
|
||||
template <typename T> static T* AllocateMR() {
|
||||
void* ptr = memory_resource()->allocate(sizeof(T), alignof(T));
|
||||
return new (ptr) T{memory_resource()};
|
||||
}
|
||||
|
||||
template <typename T> static void DeleteMR(void* ptr) {
|
||||
T* t = (T*)ptr;
|
||||
t->~T();
|
||||
memory_resource()->deallocate(ptr, alignof(T));
|
||||
}
|
||||
|
||||
private:
|
||||
size_t DecodedLen(size_t sz) const;
|
||||
|
||||
|
|
|
@ -1067,7 +1067,7 @@ SortedMap::~SortedMap() {
|
|||
}
|
||||
|
||||
// taken from zsetConvert
|
||||
unique_ptr<SortedMap> SortedMap::FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp) {
|
||||
SortedMap* SortedMap::FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp) {
|
||||
uint8_t* zl = (uint8_t*)lp;
|
||||
unsigned char *eptr, *sptr;
|
||||
unsigned char* vstr;
|
||||
|
@ -1075,7 +1075,8 @@ unique_ptr<SortedMap> SortedMap::FromListPack(PMR_NS::memory_resource* res, cons
|
|||
long long vlong;
|
||||
sds ele;
|
||||
|
||||
unique_ptr<SortedMap> zs(new SortedMap(res));
|
||||
void* ptr = res->allocate(sizeof(SortedMap), alignof(SortedMap));
|
||||
SortedMap* zs = new (ptr) SortedMap{res};
|
||||
|
||||
eptr = lpSeek(zl, 0);
|
||||
if (eptr != NULL) {
|
||||
|
|
|
@ -45,8 +45,9 @@ class SortedMap {
|
|||
|
||||
~SortedMap();
|
||||
|
||||
// The ownership for the returned SortedMap stays with the caller
|
||||
static std::unique_ptr<SortedMap> FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp);
|
||||
// The ownership for the returned SortedMap stays with the caller, and must be freed via
|
||||
// placement delete and then res->deallocate().
|
||||
static SortedMap* FromListPack(PMR_NS::memory_resource* res, const uint8_t* lp);
|
||||
|
||||
size_t Size() const {
|
||||
return std::visit(Overload{[](const auto& impl) { return impl.Size(); }}, impl_);
|
||||
|
|
|
@ -636,8 +636,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
stats->listpack_blob_cnt++;
|
||||
stats->listpack_bytes += lpBytes(lp);
|
||||
} else {
|
||||
StringMap* sm = new StringMap(CompactObj::memory_resource());
|
||||
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
|
||||
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, CompactObj::AllocateMR<StringMap>());
|
||||
}
|
||||
} else {
|
||||
if (pv.ObjType() != OBJ_HASH)
|
||||
|
@ -1202,7 +1201,7 @@ void HSetFamily::Register(CommandRegistry* registry) {
|
|||
}
|
||||
|
||||
StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) {
|
||||
StringMap* sm = new StringMap(CompactObj::memory_resource());
|
||||
StringMap* sm = CompactObj::AllocateMR<StringMap>();
|
||||
size_t lplen = lpLength(lp);
|
||||
if (lplen == 0)
|
||||
return sm;
|
||||
|
|
|
@ -510,7 +510,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
|
|||
bool use_set2 = GetFlag(FLAGS_use_set2);
|
||||
|
||||
if (use_set2) {
|
||||
StringSet* set = new StringSet{CompactObj::memory_resource()};
|
||||
StringSet* set = CompactObj::AllocateMR<StringSet>();
|
||||
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
|
||||
res = createObject(OBJ_SET, set);
|
||||
res->encoding = OBJ_ENCODING_HT;
|
||||
|
@ -641,10 +641,10 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|||
lp = lpShrinkToFit(lp);
|
||||
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
||||
} else {
|
||||
StringMap* string_map = new StringMap;
|
||||
StringMap* string_map = CompactObj::AllocateMR<StringMap>();
|
||||
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
|
||||
|
||||
auto cleanup = absl::MakeCleanup([&] { delete string_map; });
|
||||
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<StringMap>(string_map); });
|
||||
std::string key;
|
||||
string_map->Reserve(len);
|
||||
for (const auto& seg : ltrace->arr) {
|
||||
|
@ -761,9 +761,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
|
|||
|
||||
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
||||
size_t zsetlen = ltrace->blob_count();
|
||||
detail::SortedMap* zs = new detail::SortedMap(CompactObj::memory_resource());
|
||||
detail::SortedMap* zs = CompactObj::AllocateMR<detail::SortedMap>();
|
||||
unsigned encoding = OBJ_ENCODING_SKIPLIST;
|
||||
auto cleanup = absl::MakeCleanup([&] { delete zs; });
|
||||
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<detail::SortedMap>(zs); });
|
||||
|
||||
if (zsetlen > DICT_HT_INITIAL_SIZE && !zs->Reserve(zsetlen)) {
|
||||
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
|
||||
|
@ -803,7 +803,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
|||
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
|
||||
encoding = OBJ_ENCODING_LISTPACK;
|
||||
inner = zs->ToListPack();
|
||||
delete zs;
|
||||
CompactObj::DeleteMR<detail::SortedMap>(zs);
|
||||
}
|
||||
|
||||
std::move(cleanup).Cancel();
|
||||
|
@ -977,7 +977,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
|||
const bool use_set2 = GetFlag(FLAGS_use_set2);
|
||||
robj* res = nullptr;
|
||||
if (use_set2) {
|
||||
StringSet* set = new StringSet{CompactObj::memory_resource()};
|
||||
StringSet* set = CompactObj::AllocateMR<StringSet>();
|
||||
res = createObject(OBJ_SET, set);
|
||||
res->encoding = OBJ_ENCODING_HT;
|
||||
auto f = [this, res](unsigned char* val, unsigned int slen, long long lval) {
|
||||
|
@ -1060,7 +1060,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
|||
unsigned encoding = OBJ_ENCODING_LISTPACK;
|
||||
void* inner;
|
||||
if (lpBytes(lp) >= server.max_listpack_map_bytes) {
|
||||
inner = detail::SortedMap::FromListPack(CompactObj::memory_resource(), lp).release();
|
||||
inner = detail::SortedMap::FromListPack(CompactObj::memory_resource(), lp);
|
||||
lpFree(lp);
|
||||
encoding = OBJ_ENCODING_SKIPLIST;
|
||||
} else {
|
||||
|
|
|
@ -154,8 +154,7 @@ unsigned AddStrSet(const DbContext& db_context, ArgSlice vals, uint32_t ttl_sec,
|
|||
|
||||
void InitStrSet(CompactObj* set) {
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = new StringSet{CompactObj::memory_resource()};
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap2, ss);
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
|
||||
} else {
|
||||
dict* ds = dictCreate(&setDictType);
|
||||
set->InitRobj(OBJ_SET, kEncodingStrMap, ds);
|
||||
|
@ -1637,7 +1636,7 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
|
|||
int ii = 0;
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
StringSet* ss = new StringSet{CompactObj::memory_resource()};
|
||||
StringSet* ss = CompactObj::AllocateMR<StringSet>();
|
||||
if (expected_len) {
|
||||
ss->Reserve(expected_len);
|
||||
}
|
||||
|
|
|
@ -202,8 +202,7 @@ OpResult<DbSlice::ItAndUpdater> FindZEntry(const ZParams& zparams, const OpArgs&
|
|||
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
|
||||
if (add_res.is_new || zparams.override) {
|
||||
if (member_len > server.max_map_field_len) {
|
||||
detail::SortedMap* zs = new detail::SortedMap(CompactObj::memory_resource());
|
||||
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs);
|
||||
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, CompactObj::AllocateMR<detail::SortedMap>());
|
||||
} else {
|
||||
unsigned char* lp = lpNew(0);
|
||||
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
|
||||
|
|
Loading…
Reference in a new issue