From bbc809a6370391cb45b1746eeee652040e397c1e Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 23 Dec 2022 20:31:25 +0200 Subject: [PATCH] feat(server): integrate string_map into rdb code to support save/load Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 13 +++----- src/core/compact_object_test.cc | 17 ---------- src/core/dense_set.h | 14 ++++++--- src/core/string_map_test.cc | 4 +++ src/server/hset_family.cc | 53 ++++++++++++++++--------------- src/server/hset_family.h | 6 +++- src/server/rdb_load.cc | 55 +++++++++++++-------------------- src/server/rdb_save.cc | 53 +++++++++++++------------------ src/server/rdb_save.h | 2 +- 9 files changed, 92 insertions(+), 125 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 573c1101c..01e6cd10c 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -548,11 +548,7 @@ void CompactObj::ImportRObj(robj* o) { enc = GetFlag(FLAGS_use_set2) ? kEncodingStrMap2 : kEncodingStrMap; } } else if (o->type == OBJ_HASH) { - if (o->encoding == OBJ_ENCODING_HT) { - enc = kEncodingStrMap2; - } else { - enc = kEncodingListPack; - } + LOG(FATAL) << "Should not reach"; } u_.r_obj.Init(type, enc, o->ptr); if (o->refcount == 1) @@ -564,15 +560,14 @@ robj* CompactObj::AsRObj() const { CHECK_EQ(ROBJ_TAG, taglen_); robj* res = &tl.tmp_robj; + unsigned enc = u_.r_obj.encoding(); res->type = u_.r_obj.type(); - if (res->type == OBJ_SET) { + if (res->type == OBJ_SET || res->type == OBJ_HASH) { LOG(DFATAL) << "Should not call AsRObj for type " << res->type; } - if (res->type == OBJ_HASH) { - LOG(DFATAL) << "Should not call AsRObj for type " << res->type; - } + res->encoding = enc; res->lru = 0; // u_.r_obj.unneeded; res->ptr = u_.r_obj.inner_obj(); diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 06003f2c9..74d6a507b 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -229,23 +229,6 @@ TEST_F(CompactObjectTest, IntSet) { EXPECT_GT(cobj_.MallocUsed(), 0); } -TEST_F(CompactObjectTest, HSet) { - robj* src = createHashObject(); - cobj_.ImportRObj(src); - - EXPECT_EQ(OBJ_HASH, cobj_.ObjType()); - EXPECT_EQ(kEncodingListPack, cobj_.Encoding()); - - robj* os = cobj_.AsRObj(); - - sds key1 = sdsnew("key1"); - sds val1 = sdsnew("val1"); - - // returns 0 on insert. - EXPECT_EQ(0, hashTypeSet(os, key1, val1, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE)); - cobj_.SyncRObj(); -} - TEST_F(CompactObjectTest, ZSet) { // unrelated, checking that sds static encoding works. // it is used in zset special strings. diff --git a/src/core/dense_set.h b/src/core/dense_set.h index 49084817b..1cf347b32 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -249,11 +249,7 @@ class DenseSet { // Returns null if obj was added. void* AddOrFind(void* obj, bool has_ttl); - void* FindInternal(const void* obj, uint32_t cookie) const { - DensePtr* ptr = const_cast(this)->Find(obj, BucketId(obj, cookie), cookie).second; - return ptr ? ptr->GetObject() : nullptr; - } - + void* FindInternal(const void* obj, uint32_t cookie) const; void* PopInternal(); // Note this does not free any dynamic allocations done by derived classes, that a DensePtr @@ -331,4 +327,12 @@ class DenseSet { uint32_t time_now_ = 0; }; +inline void* DenseSet::FindInternal(const void* obj, uint32_t cookie) const { + if (entries_.empty()) + return nullptr; + + DensePtr* ptr = const_cast(this)->Find(obj, BucketId(obj, cookie), cookie).second; + return ptr ? ptr->GetObject() : nullptr; +} + } // namespace dfly diff --git a/src/core/string_map_test.cc b/src/core/string_map_test.cc index f6bfc3b22..46229e2c8 100644 --- a/src/core/string_map_test.cc +++ b/src/core/string_map_test.cc @@ -92,4 +92,8 @@ TEST_F(StringMapTest, Basic) { EXPECT_STREQ("baraaaaaaaaaaaa2", it->second); } +TEST_F(StringMapTest, EmptyFind) { + sm_->Find("bar"); +} + } // namespace dfly diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index bb134059f..3af4f1a40 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -117,31 +117,6 @@ pair LpInsert(uint8_t* lp, string_view field, string_view val, b return make_pair(lp, !updated); } -StringMap* ConvertToStrMap(uint8_t* lp) { - StringMap* sm = new StringMap(CompactObj::memory_resource()); - size_t lplen = lpLength(lp); - if (lplen == 0) - return sm; - - sm->Reserve(lplen / 2); - - uint8_t* lp_elem = lpFirst(lp); - uint8_t intbuf[LP_INTBUF_SIZE]; - - DCHECK(lp_elem); // empty containers are not allowed. - - do { - string_view key = LpGetView(lp_elem, intbuf); - lp_elem = lpNext(lp, lp_elem); // switch to value - DCHECK(lp_elem); - string_view value = LpGetView(lp_elem, intbuf); - lp_elem = lpNext(lp, lp_elem); // switch to next key - CHECK(sm->AddOrUpdate(key, value)); // must be unique - } while (lp_elem); - - return sm; -} - size_t HMapLength(const CompactObj& co) { void* ptr = co.RObjPtr(); if (co.Encoding() == kEncodingStrMap2) { @@ -217,7 +192,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc if (lpb >= kMaxListPackLen) { stats->listpack_blob_cnt--; - StringMap* sm = ConvertToStrMap(lp); + StringMap* sm = HSetFamily::ConvertToStrMap(lp); pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm); } } @@ -651,7 +626,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu if (!IsGoodForListpack(values, lp)) { stats->listpack_blob_cnt--; - StringMap* sm = ConvertToStrMap(lp); + StringMap* sm = HSetFamily::ConvertToStrMap(lp); pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm); lp = nullptr; } @@ -1052,4 +1027,28 @@ uint32_t HSetFamily::MaxListPackLen() { return kMaxListPackLen; } +StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) { + StringMap* sm = new StringMap(CompactObj::memory_resource()); + size_t lplen = lpLength(lp); + if (lplen == 0) + return sm; + + sm->Reserve(lplen / 2); + + uint8_t* lp_elem = lpFirst(lp); + uint8_t intbuf[LP_INTBUF_SIZE]; + + DCHECK(lp_elem); // empty containers are not allowed. + + do { + string_view key = LpGetView(lp_elem, intbuf); + lp_elem = lpNext(lp, lp_elem); // switch to value + DCHECK(lp_elem); + string_view value = LpGetView(lp_elem, intbuf); + lp_elem = lpNext(lp, lp_elem); // switch to next key + CHECK(sm->AddOrUpdate(key, value)); // must be unique + } while (lp_elem); + + return sm; +} } // namespace dfly diff --git a/src/server/hset_family.h b/src/server/hset_family.h index e9fbcf2a0..a03e64b49 100644 --- a/src/server/hset_family.h +++ b/src/server/hset_family.h @@ -13,6 +13,8 @@ namespace dfly { class ConnectionContext; class CommandRegistry; +class StringMap; + using facade::OpResult; using facade::OpStatus; @@ -21,8 +23,10 @@ class HSetFamily { static void Register(CommandRegistry* registry); static uint32_t MaxListPackLen(); - private: + // Does not free lp. + static StringMap* ConvertToStrMap(uint8_t* lp); + private: static void HDel(CmdArgList args, ConnectionContext* cntx); static void HLen(CmdArgList args, ConnectionContext* cntx); static void HExists(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 77cacf892..46adcfaa7 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -24,6 +24,7 @@ extern "C" { #include "base/endian.h" #include "base/flags.h" #include "base/logging.h" +#include "core/string_map.h" #include "core/string_set.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -534,8 +535,6 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { } } - robj* res = nullptr; - if (keep_lp) { uint8_t* lp = lpNew(lp_size); @@ -554,44 +553,30 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { } lp = lpShrinkToFit(lp); - res = createObject(OBJ_HASH, lp); - res->encoding = OBJ_ENCODING_LISTPACK; + pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp); } else { - dict* hmap = dictCreate(&hashDictType); - - auto cleanup = absl::MakeCleanup([&] { dictRelease(hmap); }); - - if (len > DICT_HT_INITIAL_SIZE) { - if (dictTryExpand(hmap, len) != DICT_OK) { - LOG(ERROR) << "OOM in dictTryExpand " << len; - ec_ = RdbError(errc::out_of_memory); - return; - } - } + StringMap* string_map = new StringMap; + auto cleanup = absl::MakeCleanup([&] { delete string_map; }); + string_map->Reserve(len); for (size_t i = 0; i < len; ++i) { - sds key = ToSds(ltrace->arr[i * 2].rdb_var); - sds val = ToSds(ltrace->arr[i * 2 + 1].rdb_var); + // ToSV may reference an internal buffer, therefore we can use only before the + // next call to ToSV. To workaround, I copy the key to string. + string key(ToSV(ltrace->arr[i * 2].rdb_var)); + string_view val = ToSV(ltrace->arr[i * 2 + 1].rdb_var); - if (!key || !val) + if (ec_) return; - /* Add pair to hash table */ - int ret = dictAdd(hmap, key, val); - if (ret == DICT_ERR) { + if (!string_map->AddOrSkip(key, val)) { LOG(ERROR) << "Duplicate hash fields detected"; ec_ = RdbError(errc::rdb_file_corrupted); return; } } - - res = createObject(OBJ_HASH, hmap); - res->encoding = OBJ_ENCODING_HT; + pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map); std::move(cleanup).Cancel(); } - - DCHECK(res); - pv_->ImportRObj(res); } void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { @@ -871,13 +856,15 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { return; } - res = createObject(OBJ_HASH, lp); - res->encoding = OBJ_ENCODING_LISTPACK; - - if (lpBytes(lp) > HSetFamily::MaxListPackLen()) - hashTypeConvert(res, OBJ_ENCODING_HT); - else - res->ptr = lpShrinkToFit((uint8_t*)res->ptr); + if (lpBytes(lp) > HSetFamily::MaxListPackLen()) { + StringMap* sm = HSetFamily::ConvertToStrMap(lp); + lpFree(lp); + pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, sm); + } else { + lp = lpShrinkToFit(lp); + pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp); + } + return; } else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) { unsigned char* lp = lpNew(blob.size()); if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 970d8564b..be61f401b 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -10,8 +10,6 @@ #include #include -#include "core/string_set.h" - extern "C" { #include "redis/intset.h" #include "redis/listpack.h" @@ -25,6 +23,8 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "core/string_map.h" +#include "core/string_set.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/serializer.h" @@ -103,30 +103,30 @@ constexpr size_t kAmask = 4_KB - 1; } // namespace -uint8_t RdbObjectType(unsigned type, unsigned encoding) { +uint8_t RdbObjectType(unsigned type, unsigned compact_enc) { switch (type) { case OBJ_STRING: return RDB_TYPE_STRING; case OBJ_LIST: - if (encoding == OBJ_ENCODING_QUICKLIST) + if (compact_enc == OBJ_ENCODING_QUICKLIST) return RDB_TYPE_LIST_QUICKLIST; break; case OBJ_SET: - if (encoding == kEncodingIntSet) + if (compact_enc == kEncodingIntSet) return RDB_TYPE_SET_INTSET; - else if (encoding == kEncodingStrMap || encoding == kEncodingStrMap2) + else if (compact_enc == kEncodingStrMap || compact_enc == kEncodingStrMap2) return RDB_TYPE_SET; break; case OBJ_ZSET: - if (encoding == OBJ_ENCODING_LISTPACK) + if (compact_enc == OBJ_ENCODING_LISTPACK) return RDB_TYPE_ZSET_ZIPLIST; // we save using the old ziplist encoding. - else if (encoding == OBJ_ENCODING_SKIPLIST) + else if (compact_enc == OBJ_ENCODING_SKIPLIST) return RDB_TYPE_ZSET_2; break; case OBJ_HASH: - if (encoding == kEncodingListPack) + if (compact_enc == kEncodingListPack) return RDB_TYPE_HASH_ZIPLIST; - else if (encoding == kEncodingStrMap) + else if (compact_enc == kEncodingStrMap2) return RDB_TYPE_HASH; break; case OBJ_STREAM: @@ -134,7 +134,7 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) { case OBJ_MODULE: return RDB_TYPE_MODULE_2; } - LOG(FATAL) << "Unknown encoding " << encoding << " for type " << type; + LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type; return 0; /* avoid warning */ } @@ -304,7 +304,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) { } if (obj_type == OBJ_HASH) { - return SaveHSetObject(pv.AsRObj()); + return SaveHSetObject(pv); } if (obj_type == OBJ_ZSET) { @@ -402,31 +402,22 @@ error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) { return error_code{}; } -error_code RdbSerializer::SaveHSetObject(const robj* obj) { - DCHECK_EQ(OBJ_HASH, obj->type); - if (obj->encoding == OBJ_ENCODING_HT) { - dict* set = (dict*)obj->ptr; +error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) { + DCHECK_EQ(OBJ_HASH, pv.ObjType()); - RETURN_ON_ERR(SaveLen(dictSize(set))); + if (pv.Encoding() == kEncodingStrMap2) { + StringMap* string_map = (StringMap*)pv.RObjPtr(); - dictIterator* di = dictGetIterator(set); - dictEntry* de; - auto cleanup = absl::MakeCleanup([di] { dictReleaseIterator(di); }); + RETURN_ON_ERR(SaveLen(string_map->Size())); - while ((de = dictNext(di)) != NULL) { - sds key = (sds)de->key; - sds value = (sds)de->v.val; - - RETURN_ON_ERR(SaveString(string_view{key, sdslen(key)})); - RETURN_ON_ERR(SaveString(string_view{value, sdslen(value)})); + for (const auto& k_v : *string_map) { + RETURN_ON_ERR(SaveString(string_view{k_v.first, sdslen(k_v.first)})); + RETURN_ON_ERR(SaveString(string_view{k_v.second, sdslen(k_v.second)})); } } else { - CHECK_EQ(unsigned(OBJ_ENCODING_LISTPACK), obj->encoding); - - uint8_t* lp = (uint8_t*)obj->ptr; - size_t lplen = lpLength(lp); - CHECK(lplen > 0 && lplen % 2 == 0); // has (key,value) pairs. + CHECK_EQ(kEncodingListPack, pv.Encoding()); + uint8_t* lp = (uint8_t*)pv.RObjPtr(); RETURN_ON_ERR(SaveListPackAsZiplist(lp)); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 6ed716d03..e6e71a615 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -156,7 +156,7 @@ class RdbSerializer { std::error_code SaveObject(const PrimeValue& pv); std::error_code SaveListObject(const robj* obj); std::error_code SaveSetObject(const PrimeValue& pv); - std::error_code SaveHSetObject(const robj* obj); + std::error_code SaveHSetObject(const PrimeValue& pv); std::error_code SaveZSetObject(const robj* obj); std::error_code SaveStreamObject(const robj* obj); std::error_code SaveLongLongAsString(int64_t value);