mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(server): integrate string_map into rdb code to support save/load
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
69b00b9cdd
commit
bbc809a637
9 changed files with 92 additions and 125 deletions
|
@ -548,11 +548,7 @@ void CompactObj::ImportRObj(robj* o) {
|
|||
enc = GetFlag(FLAGS_use_set2) ? kEncodingStrMap2 : kEncodingStrMap;
|
||||
}
|
||||
} else if (o->type == OBJ_HASH) {
|
||||
if (o->encoding == OBJ_ENCODING_HT) {
|
||||
enc = kEncodingStrMap2;
|
||||
} else {
|
||||
enc = kEncodingListPack;
|
||||
}
|
||||
LOG(FATAL) << "Should not reach";
|
||||
}
|
||||
u_.r_obj.Init(type, enc, o->ptr);
|
||||
if (o->refcount == 1)
|
||||
|
@ -564,15 +560,14 @@ robj* CompactObj::AsRObj() const {
|
|||
CHECK_EQ(ROBJ_TAG, taglen_);
|
||||
|
||||
robj* res = &tl.tmp_robj;
|
||||
unsigned enc = u_.r_obj.encoding();
|
||||
res->type = u_.r_obj.type();
|
||||
|
||||
if (res->type == OBJ_SET) {
|
||||
if (res->type == OBJ_SET || res->type == OBJ_HASH) {
|
||||
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
|
||||
}
|
||||
|
||||
if (res->type == OBJ_HASH) {
|
||||
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
|
||||
}
|
||||
res->encoding = enc;
|
||||
res->lru = 0; // u_.r_obj.unneeded;
|
||||
res->ptr = u_.r_obj.inner_obj();
|
||||
|
||||
|
|
|
@ -229,23 +229,6 @@ TEST_F(CompactObjectTest, IntSet) {
|
|||
EXPECT_GT(cobj_.MallocUsed(), 0);
|
||||
}
|
||||
|
||||
TEST_F(CompactObjectTest, HSet) {
|
||||
robj* src = createHashObject();
|
||||
cobj_.ImportRObj(src);
|
||||
|
||||
EXPECT_EQ(OBJ_HASH, cobj_.ObjType());
|
||||
EXPECT_EQ(kEncodingListPack, cobj_.Encoding());
|
||||
|
||||
robj* os = cobj_.AsRObj();
|
||||
|
||||
sds key1 = sdsnew("key1");
|
||||
sds val1 = sdsnew("val1");
|
||||
|
||||
// returns 0 on insert.
|
||||
EXPECT_EQ(0, hashTypeSet(os, key1, val1, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE));
|
||||
cobj_.SyncRObj();
|
||||
}
|
||||
|
||||
TEST_F(CompactObjectTest, ZSet) {
|
||||
// unrelated, checking that sds static encoding works.
|
||||
// it is used in zset special strings.
|
||||
|
|
|
@ -249,11 +249,7 @@ class DenseSet {
|
|||
// Returns null if obj was added.
|
||||
void* AddOrFind(void* obj, bool has_ttl);
|
||||
|
||||
void* FindInternal(const void* obj, uint32_t cookie) const {
|
||||
DensePtr* ptr = const_cast<DenseSet*>(this)->Find(obj, BucketId(obj, cookie), cookie).second;
|
||||
return ptr ? ptr->GetObject() : nullptr;
|
||||
}
|
||||
|
||||
void* FindInternal(const void* obj, uint32_t cookie) const;
|
||||
void* PopInternal();
|
||||
|
||||
// Note this does not free any dynamic allocations done by derived classes, that a DensePtr
|
||||
|
@ -331,4 +327,12 @@ class DenseSet {
|
|||
uint32_t time_now_ = 0;
|
||||
};
|
||||
|
||||
inline void* DenseSet::FindInternal(const void* obj, uint32_t cookie) const {
|
||||
if (entries_.empty())
|
||||
return nullptr;
|
||||
|
||||
DensePtr* ptr = const_cast<DenseSet*>(this)->Find(obj, BucketId(obj, cookie), cookie).second;
|
||||
return ptr ? ptr->GetObject() : nullptr;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -92,4 +92,8 @@ TEST_F(StringMapTest, Basic) {
|
|||
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);
|
||||
}
|
||||
|
||||
TEST_F(StringMapTest, EmptyFind) {
|
||||
sm_->Find("bar");
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -117,31 +117,6 @@ pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, b
|
|||
return make_pair(lp, !updated);
|
||||
}
|
||||
|
||||
StringMap* ConvertToStrMap(uint8_t* lp) {
|
||||
StringMap* sm = new StringMap(CompactObj::memory_resource());
|
||||
size_t lplen = lpLength(lp);
|
||||
if (lplen == 0)
|
||||
return sm;
|
||||
|
||||
sm->Reserve(lplen / 2);
|
||||
|
||||
uint8_t* lp_elem = lpFirst(lp);
|
||||
uint8_t intbuf[LP_INTBUF_SIZE];
|
||||
|
||||
DCHECK(lp_elem); // empty containers are not allowed.
|
||||
|
||||
do {
|
||||
string_view key = LpGetView(lp_elem, intbuf);
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to value
|
||||
DCHECK(lp_elem);
|
||||
string_view value = LpGetView(lp_elem, intbuf);
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to next key
|
||||
CHECK(sm->AddOrUpdate(key, value)); // must be unique
|
||||
} while (lp_elem);
|
||||
|
||||
return sm;
|
||||
}
|
||||
|
||||
size_t HMapLength(const CompactObj& co) {
|
||||
void* ptr = co.RObjPtr();
|
||||
if (co.Encoding() == kEncodingStrMap2) {
|
||||
|
@ -217,7 +192,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
|
|||
|
||||
if (lpb >= kMaxListPackLen) {
|
||||
stats->listpack_blob_cnt--;
|
||||
StringMap* sm = ConvertToStrMap(lp);
|
||||
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
|
||||
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
|
||||
}
|
||||
}
|
||||
|
@ -651,7 +626,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
|
||||
if (!IsGoodForListpack(values, lp)) {
|
||||
stats->listpack_blob_cnt--;
|
||||
StringMap* sm = ConvertToStrMap(lp);
|
||||
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
|
||||
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
|
||||
lp = nullptr;
|
||||
}
|
||||
|
@ -1052,4 +1027,28 @@ uint32_t HSetFamily::MaxListPackLen() {
|
|||
return kMaxListPackLen;
|
||||
}
|
||||
|
||||
StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) {
|
||||
StringMap* sm = new StringMap(CompactObj::memory_resource());
|
||||
size_t lplen = lpLength(lp);
|
||||
if (lplen == 0)
|
||||
return sm;
|
||||
|
||||
sm->Reserve(lplen / 2);
|
||||
|
||||
uint8_t* lp_elem = lpFirst(lp);
|
||||
uint8_t intbuf[LP_INTBUF_SIZE];
|
||||
|
||||
DCHECK(lp_elem); // empty containers are not allowed.
|
||||
|
||||
do {
|
||||
string_view key = LpGetView(lp_elem, intbuf);
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to value
|
||||
DCHECK(lp_elem);
|
||||
string_view value = LpGetView(lp_elem, intbuf);
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to next key
|
||||
CHECK(sm->AddOrUpdate(key, value)); // must be unique
|
||||
} while (lp_elem);
|
||||
|
||||
return sm;
|
||||
}
|
||||
} // namespace dfly
|
||||
|
|
|
@ -13,6 +13,8 @@ namespace dfly {
|
|||
|
||||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
class StringMap;
|
||||
|
||||
using facade::OpResult;
|
||||
using facade::OpStatus;
|
||||
|
||||
|
@ -21,8 +23,10 @@ class HSetFamily {
|
|||
static void Register(CommandRegistry* registry);
|
||||
static uint32_t MaxListPackLen();
|
||||
|
||||
private:
|
||||
// Does not free lp.
|
||||
static StringMap* ConvertToStrMap(uint8_t* lp);
|
||||
|
||||
private:
|
||||
static void HDel(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HLen(CmdArgList args, ConnectionContext* cntx);
|
||||
static void HExists(CmdArgList args, ConnectionContext* cntx);
|
||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
|||
#include "base/endian.h"
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/string_map.h"
|
||||
#include "core/string_set.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
|
@ -534,8 +535,6 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|||
}
|
||||
}
|
||||
|
||||
robj* res = nullptr;
|
||||
|
||||
if (keep_lp) {
|
||||
uint8_t* lp = lpNew(lp_size);
|
||||
|
||||
|
@ -554,44 +553,30 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|||
}
|
||||
|
||||
lp = lpShrinkToFit(lp);
|
||||
res = createObject(OBJ_HASH, lp);
|
||||
res->encoding = OBJ_ENCODING_LISTPACK;
|
||||
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
||||
} else {
|
||||
dict* hmap = dictCreate(&hashDictType);
|
||||
|
||||
auto cleanup = absl::MakeCleanup([&] { dictRelease(hmap); });
|
||||
|
||||
if (len > DICT_HT_INITIAL_SIZE) {
|
||||
if (dictTryExpand(hmap, len) != DICT_OK) {
|
||||
LOG(ERROR) << "OOM in dictTryExpand " << len;
|
||||
ec_ = RdbError(errc::out_of_memory);
|
||||
return;
|
||||
}
|
||||
}
|
||||
StringMap* string_map = new StringMap;
|
||||
|
||||
auto cleanup = absl::MakeCleanup([&] { delete string_map; });
|
||||
string_map->Reserve(len);
|
||||
for (size_t i = 0; i < len; ++i) {
|
||||
sds key = ToSds(ltrace->arr[i * 2].rdb_var);
|
||||
sds val = ToSds(ltrace->arr[i * 2 + 1].rdb_var);
|
||||
// ToSV may reference an internal buffer, therefore we can use only before the
|
||||
// next call to ToSV. To workaround, I copy the key to string.
|
||||
string key(ToSV(ltrace->arr[i * 2].rdb_var));
|
||||
string_view val = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
|
||||
|
||||
if (!key || !val)
|
||||
if (ec_)
|
||||
return;
|
||||
|
||||
/* Add pair to hash table */
|
||||
int ret = dictAdd(hmap, key, val);
|
||||
if (ret == DICT_ERR) {
|
||||
if (!string_map->AddOrSkip(key, val)) {
|
||||
LOG(ERROR) << "Duplicate hash fields detected";
|
||||
ec_ = RdbError(errc::rdb_file_corrupted);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
res = createObject(OBJ_HASH, hmap);
|
||||
res->encoding = OBJ_ENCODING_HT;
|
||||
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
|
||||
std::move(cleanup).Cancel();
|
||||
}
|
||||
|
||||
DCHECK(res);
|
||||
pv_->ImportRObj(res);
|
||||
}
|
||||
|
||||
void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
|
||||
|
@ -871,13 +856,15 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
|||
return;
|
||||
}
|
||||
|
||||
res = createObject(OBJ_HASH, lp);
|
||||
res->encoding = OBJ_ENCODING_LISTPACK;
|
||||
|
||||
if (lpBytes(lp) > HSetFamily::MaxListPackLen())
|
||||
hashTypeConvert(res, OBJ_ENCODING_HT);
|
||||
else
|
||||
res->ptr = lpShrinkToFit((uint8_t*)res->ptr);
|
||||
if (lpBytes(lp) > HSetFamily::MaxListPackLen()) {
|
||||
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
|
||||
lpFree(lp);
|
||||
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
|
||||
} else {
|
||||
lp = lpShrinkToFit(lp);
|
||||
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
||||
}
|
||||
return;
|
||||
} else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) {
|
||||
unsigned char* lp = lpNew(blob.size());
|
||||
if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) {
|
||||
|
|
|
@ -10,8 +10,6 @@
|
|||
#include <lz4frame.h>
|
||||
#include <zstd.h>
|
||||
|
||||
#include "core/string_set.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/intset.h"
|
||||
#include "redis/listpack.h"
|
||||
|
@ -25,6 +23,8 @@ extern "C" {
|
|||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/string_map.h"
|
||||
#include "core/string_set.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/serializer.h"
|
||||
|
@ -103,30 +103,30 @@ constexpr size_t kAmask = 4_KB - 1;
|
|||
|
||||
} // namespace
|
||||
|
||||
uint8_t RdbObjectType(unsigned type, unsigned encoding) {
|
||||
uint8_t RdbObjectType(unsigned type, unsigned compact_enc) {
|
||||
switch (type) {
|
||||
case OBJ_STRING:
|
||||
return RDB_TYPE_STRING;
|
||||
case OBJ_LIST:
|
||||
if (encoding == OBJ_ENCODING_QUICKLIST)
|
||||
if (compact_enc == OBJ_ENCODING_QUICKLIST)
|
||||
return RDB_TYPE_LIST_QUICKLIST;
|
||||
break;
|
||||
case OBJ_SET:
|
||||
if (encoding == kEncodingIntSet)
|
||||
if (compact_enc == kEncodingIntSet)
|
||||
return RDB_TYPE_SET_INTSET;
|
||||
else if (encoding == kEncodingStrMap || encoding == kEncodingStrMap2)
|
||||
else if (compact_enc == kEncodingStrMap || compact_enc == kEncodingStrMap2)
|
||||
return RDB_TYPE_SET;
|
||||
break;
|
||||
case OBJ_ZSET:
|
||||
if (encoding == OBJ_ENCODING_LISTPACK)
|
||||
if (compact_enc == OBJ_ENCODING_LISTPACK)
|
||||
return RDB_TYPE_ZSET_ZIPLIST; // we save using the old ziplist encoding.
|
||||
else if (encoding == OBJ_ENCODING_SKIPLIST)
|
||||
else if (compact_enc == OBJ_ENCODING_SKIPLIST)
|
||||
return RDB_TYPE_ZSET_2;
|
||||
break;
|
||||
case OBJ_HASH:
|
||||
if (encoding == kEncodingListPack)
|
||||
if (compact_enc == kEncodingListPack)
|
||||
return RDB_TYPE_HASH_ZIPLIST;
|
||||
else if (encoding == kEncodingStrMap)
|
||||
else if (compact_enc == kEncodingStrMap2)
|
||||
return RDB_TYPE_HASH;
|
||||
break;
|
||||
case OBJ_STREAM:
|
||||
|
@ -134,7 +134,7 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
|
|||
case OBJ_MODULE:
|
||||
return RDB_TYPE_MODULE_2;
|
||||
}
|
||||
LOG(FATAL) << "Unknown encoding " << encoding << " for type " << type;
|
||||
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
|
||||
return 0; /* avoid warning */
|
||||
}
|
||||
|
||||
|
@ -304,7 +304,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
|
|||
}
|
||||
|
||||
if (obj_type == OBJ_HASH) {
|
||||
return SaveHSetObject(pv.AsRObj());
|
||||
return SaveHSetObject(pv);
|
||||
}
|
||||
|
||||
if (obj_type == OBJ_ZSET) {
|
||||
|
@ -402,31 +402,22 @@ error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) {
|
|||
return error_code{};
|
||||
}
|
||||
|
||||
error_code RdbSerializer::SaveHSetObject(const robj* obj) {
|
||||
DCHECK_EQ(OBJ_HASH, obj->type);
|
||||
if (obj->encoding == OBJ_ENCODING_HT) {
|
||||
dict* set = (dict*)obj->ptr;
|
||||
error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) {
|
||||
DCHECK_EQ(OBJ_HASH, pv.ObjType());
|
||||
|
||||
RETURN_ON_ERR(SaveLen(dictSize(set)));
|
||||
if (pv.Encoding() == kEncodingStrMap2) {
|
||||
StringMap* string_map = (StringMap*)pv.RObjPtr();
|
||||
|
||||
dictIterator* di = dictGetIterator(set);
|
||||
dictEntry* de;
|
||||
auto cleanup = absl::MakeCleanup([di] { dictReleaseIterator(di); });
|
||||
RETURN_ON_ERR(SaveLen(string_map->Size()));
|
||||
|
||||
while ((de = dictNext(di)) != NULL) {
|
||||
sds key = (sds)de->key;
|
||||
sds value = (sds)de->v.val;
|
||||
|
||||
RETURN_ON_ERR(SaveString(string_view{key, sdslen(key)}));
|
||||
RETURN_ON_ERR(SaveString(string_view{value, sdslen(value)}));
|
||||
for (const auto& k_v : *string_map) {
|
||||
RETURN_ON_ERR(SaveString(string_view{k_v.first, sdslen(k_v.first)}));
|
||||
RETURN_ON_ERR(SaveString(string_view{k_v.second, sdslen(k_v.second)}));
|
||||
}
|
||||
} else {
|
||||
CHECK_EQ(unsigned(OBJ_ENCODING_LISTPACK), obj->encoding);
|
||||
|
||||
uint8_t* lp = (uint8_t*)obj->ptr;
|
||||
size_t lplen = lpLength(lp);
|
||||
CHECK(lplen > 0 && lplen % 2 == 0); // has (key,value) pairs.
|
||||
CHECK_EQ(kEncodingListPack, pv.Encoding());
|
||||
|
||||
uint8_t* lp = (uint8_t*)pv.RObjPtr();
|
||||
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
|
||||
}
|
||||
|
||||
|
|
|
@ -156,7 +156,7 @@ class RdbSerializer {
|
|||
std::error_code SaveObject(const PrimeValue& pv);
|
||||
std::error_code SaveListObject(const robj* obj);
|
||||
std::error_code SaveSetObject(const PrimeValue& pv);
|
||||
std::error_code SaveHSetObject(const robj* obj);
|
||||
std::error_code SaveHSetObject(const PrimeValue& pv);
|
||||
std::error_code SaveZSetObject(const robj* obj);
|
||||
std::error_code SaveStreamObject(const robj* obj);
|
||||
std::error_code SaveLongLongAsString(int64_t value);
|
||||
|
|
Loading…
Reference in a new issue