1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: serialize SBF (#2846)

* chore: serialize SBF

SAVE/LOAD supports SBF now. Also fixes MallocUsed for SBF.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-04-09 09:57:24 +03:00 committed by GitHub
parent b1e688b33f
commit 57d567639c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 107 additions and 4 deletions

View file

@ -1051,7 +1051,7 @@ size_t CompactObj::MallocUsed() const {
}
if (taglen_ == SBF_TAG) {
return 0; // TODO: to track SBF memory utilization.
return u_.sbf->MallocUsed();
}
LOG(DFATAL) << "should not reach";
return 0;

View file

@ -369,7 +369,7 @@ TEST_F(CompactObjectTest, Hash) {
TEST_F(CompactObjectTest, SBF) {
cobj_.SetSBF(1000, 0.001, 2);
EXPECT_EQ(cobj_.ObjType(), OBJ_SBF);
EXPECT_EQ(0, cobj_.MallocUsed());
EXPECT_GT(cobj_.MallocUsed(), 0);
}
TEST_F(CompactObjectTest, MimallocUnderutilzation) {

View file

@ -13,10 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON_OLD = 20;
constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY);
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
}
// Opcodes: Range 200-240 is used by DF extensions.

View file

@ -29,6 +29,7 @@ extern "C" {
#include "base/endian.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
@ -382,6 +383,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const base::PODArray<char>& str);
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);
std::error_code ec() const {
return ec_;
@ -466,6 +468,16 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
}
}
void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
SBF* sbf =
CompactObj::AllocateMR<SBF>(src.grow_factor, src.fp_prob, src.max_capacity, src.prev_size,
src.current_size, CompactObj::memory_resource());
for (unsigned i = 0; i < src.filters.size(); ++i) {
sbf->AddFilter(src.filters[i].blob, src.filters[i].hash_cnt);
}
pv_->SetSBF(sbf);
}
void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();
@ -1367,6 +1379,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_MODULE_2:
iores = ReadRedisJson();
break;
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
@ -1827,6 +1842,39 @@ auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
}
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
RdbSBF res;
uint64_t options;
SET_OR_UNEXPECT(LoadLen(nullptr), options);
if (options != 0)
return Unexpected(errc::rdb_file_corrupted);
SET_OR_UNEXPECT(FetchBinaryDouble(), res.grow_factor);
SET_OR_UNEXPECT(FetchBinaryDouble(), res.fp_prob);
if (res.fp_prob <= 0 || res.fp_prob > 0.5) {
return Unexpected(errc::rdb_file_corrupted);
}
SET_OR_UNEXPECT(LoadLen(nullptr), res.prev_size);
SET_OR_UNEXPECT(LoadLen(nullptr), res.current_size);
SET_OR_UNEXPECT(LoadLen(nullptr), res.max_capacity);
unsigned num_filters = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), num_filters);
auto is_power2 = [](size_t n) { return (n & (n - 1)) == 0; };
for (unsigned i = 0; i < num_filters; ++i) {
unsigned hash_cnt;
string filter_data;
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
size_t bit_len = filter_data.size() * 8;
if (!is_power2(bit_len)) { // must be power of two
return Unexpected(errc::rdb_file_corrupted);
}
res.filters.emplace_back(hash_cnt, std::move(filter_data));
}
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}
template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)

View file

@ -38,8 +38,22 @@ class RdbLoaderBase {
uint64_t uncompressed_len;
};
struct RdbSBF {
double grow_factor, fp_prob;
size_t prev_size, current_size;
size_t max_capacity;
struct Filter {
unsigned hash_cnt;
std::string blob;
Filter(unsigned h, std::string b) : hash_cnt(h), blob(std::move(b)) {
}
};
std::vector<Filter> filters;
};
using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>>;
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;
struct OpaqueObj {
RdbVariant obj;
@ -131,6 +145,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadStreams();
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();
std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);

View file

@ -27,6 +27,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
@ -194,6 +195,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_JSON:
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July
// 2024.
case OBJ_SBF:
return RDB_TYPE_SBF;
}
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
return 0; /* avoid warning */
@ -395,6 +398,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
return SaveJsonObject(pv);
}
if (obj_type == OBJ_SBF) {
return SaveSBFObject(pv);
}
LOG(ERROR) << "Not implemented " << obj_type;
return make_error_code(errc::function_not_supported);
}
@ -620,6 +627,28 @@ error_code RdbSerializer::SaveJsonObject(const PrimeValue& pv) {
return SaveString(json_string);
}
std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
SBF* sbf = pv.GetSBF();
// options to allow format mutations in the future.
RETURN_ON_ERR(SaveLen(0)); // options - reserved
RETURN_ON_ERR(SaveBinaryDouble(sbf->grow_factor()));
RETURN_ON_ERR(SaveBinaryDouble(sbf->fp_probability()));
RETURN_ON_ERR(SaveLen(sbf->prev_size()));
RETURN_ON_ERR(SaveLen(sbf->current_size()));
RETURN_ON_ERR(SaveLen(sbf->max_capacity()));
RETURN_ON_ERR(SaveLen(sbf->num_filters()));
for (unsigned i = 0; i < sbf->num_filters(); ++i) {
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));
string_view blob = sbf->data(i);
RETURN_ON_ERR(SaveString(blob));
}
return {};
}
/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];

View file

@ -230,6 +230,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveZSetObject(const PrimeValue& pv);
std::error_code SaveStreamObject(const PrimeValue& obj);
std::error_code SaveJsonObject(const PrimeValue& pv);
std::error_code SaveSBFObject(const PrimeValue& pv);
std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);

View file

@ -534,4 +534,12 @@ TEST_F(RdbTest, RedisJson) {
"{\"company\":\"DragonflyDB\",\"product\":\"Dragonfly\",\"website\":\"https://"
"dragondlydb.io\",\"years-active\":[2021,2022,2023,2024,\"and more!\"]}");
}
TEST_F(RdbTest, SBF) {
EXPECT_THAT(Run({"BF.ADD", "k", "1"}), IntArg(1));
Run({"debug", "reload"});
EXPECT_EQ(Run({"type", "k"}), "MBbloom--");
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
}
} // namespace dfly