1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat(rdb_load): add support for loading huge hmaps and zsets (#3823)

* feat(rdb_load): add support for loading huge hmaps

* feat(rdb_load): add support for loading huge zsets

* feat(rdb_load): log DFATAL when append fails
This commit is contained in:
Andy Dunstall 2024-10-01 07:52:52 +01:00 committed by GitHub
parent 2d11a046be
commit e3214cb603
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 177 additions and 63 deletions

View file

@ -550,12 +550,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
// which is greater than SetFamily::MaxIntsetEntries so we'll always use
// a string set not an int set.
if (pv_->ObjType() != OBJ_SET) {
LOG(ERROR) << "Invalid RDB type " << pv_->ObjType();
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(ERROR) << "Invalid encoding " << pv_->Encoding();
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}
@ -662,12 +662,37 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
lp = lpShrinkToFit(lp);
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
} else {
StringMap* string_map = CompactObj::AllocateMR<StringMap>();
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
StringMap* string_map;
if (config_.append) {
// Note we only use append_ when the map size exceeds kMaxBlobLen,
// which is greater than 64 so we'll always use a StringMap set not
// listpack.
if (pv_->ObjType() != OBJ_HASH) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<StringMap>(string_map); });
string_map = static_cast<StringMap*>(pv_->RObjPtr());
} else {
string_map = CompactObj::AllocateMR<StringMap>();
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
// Expand the map up front to avoid rehashing.
string_map->Reserve((config_.reserve > len) ? config_.reserve : len);
}
auto cleanup = absl::MakeCleanup([&] {
if (!config_.append) {
CompactObj::DeleteMR<StringMap>(string_map);
}
});
std::string key;
string_map->Reserve(len);
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += increment) {
// ToSV may reference an internal buffer, therefore we can use only before the
@ -705,7 +730,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
}
}
}
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
if (!config_.append) {
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
}
std::move(cleanup).Cancel();
}
}
@ -780,16 +807,42 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
size_t zsetlen = ltrace->blob_count();
detail::SortedMap* zs = CompactObj::AllocateMR<detail::SortedMap>();
unsigned encoding = OBJ_ENCODING_SKIPLIST;
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<detail::SortedMap>(zs); });
if (zsetlen > 2 && !zs->Reserve(zsetlen)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
unsigned encoding = OBJ_ENCODING_SKIPLIST;
detail::SortedMap* zs;
if (config_.append) {
// Note we only use append_ when the set size exceeds kMaxBlobLen,
// which is greater than server.zset_max_listpack_entries so we'll always
// use a SortedMap set not listpack.
if (pv_->ObjType() != OBJ_ZSET) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}
zs = static_cast<detail::SortedMap*>(pv_->RObjPtr());
} else {
zs = CompactObj::AllocateMR<detail::SortedMap>();
size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen;
if (reserve > 2 && !zs->Reserve(reserve)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
}
}
auto cleanup = absl::MakeCleanup([&] {
if (!config_.append) {
CompactObj::DeleteMR<detail::SortedMap>(zs);
}
});
size_t maxelelen = 0, totelelen = 0;
Iterate(*ltrace, [&](const LoadBlob& blob) {
@ -827,7 +880,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
std::move(cleanup).Cancel();
pv_->InitRobj(OBJ_ZSET, encoding, inner);
if (!config_.append) {
pv_->InitRobj(OBJ_ZSET, encoding, inner);
}
}
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
@ -1586,65 +1641,84 @@ auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result<OpaqueObj> {
size_t len;
SET_OR_UNEXPECT(LoadLen(nullptr), len);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
if (rdbtype == RDB_TYPE_HASH) {
len *= 2;
if (pending_read_.remaining > 0) {
len = pending_read_.remaining;
} else {
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
len *= 3;
SET_OR_UNEXPECT(LoadLen(NULL), len);
if (rdbtype == RDB_TYPE_HASH) {
len *= 2;
} else {
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
len *= 3;
}
pending_read_.reserve = len;
}
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(len, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
}
len -= n;
// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(1);
size_t n = std::min<size_t>(len, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec)
return make_unexpected(ec);
}
// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (len > n) {
pending_read_.remaining = len - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}
return OpaqueObj{std::move(load_trace), rdbtype};
}
auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
/* Read sorted set value. */
uint64_t zsetlen;
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
if (pending_read_.remaining > 0) {
zsetlen = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
pending_read_.reserve = zsetlen;
}
if (zsetlen == 0)
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen);
double score;
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].score = score;
// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(1);
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
zsetlen -= n;
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[0][i].score = score;
}
// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (zsetlen > n) {
pending_read_.remaining = zsetlen - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}
return OpaqueObj{std::move(load_trace), rdbtype};

View file

@ -567,11 +567,11 @@ TEST_F(RdbTest, DflyLoadAppend) {
// Tests loading a huge set, where the set is loaded in multiple partial reads.
TEST_F(RdbTest, LoadHugeSet) {
// Add 2 sets with 200k elements each (note must have more than kMaxBlobLen
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "200000"});
ASSERT_EQ(200000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(200000, CheckedInt({"scard", "test:1"}));
Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
@ -580,8 +580,48 @@ TEST_F(RdbTest, LoadHugeSet) {
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
ASSERT_EQ(200000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(200000, CheckedInt({"scard", "test:1"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));
}
// Tests loading a huge hmap, where the map is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeHMap) {
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "hash", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
}
// Tests loading a huge zset, where the zset is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeZSet) {
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "zset", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
}
} // namespace dfly