1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(rdb save): refactor move zstd serializer under rdb serializer (#533)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2022-12-06 13:28:19 +02:00 committed by GitHub
parent 3efc5642bc
commit d74b076e18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 115 deletions

View file

@ -431,7 +431,9 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it";
std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>();
int compression_mode = absl::GetFlag(FLAGS_compression_mode);
RdbSerializer serializer(compression_mode != 0);
CompressionMode serializer_compression_mode =
compression_mode == 0 ? CompressionMode::NONE : CompressionMode::SINGLE_ENTRY;
RdbSerializer serializer(serializer_compression_mode);
// According to Redis code we need to
// 1. Save the value itself - without the key

View file

@ -166,11 +166,52 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
return 0; /* avoid warning */
}
RdbSerializer::RdbSerializer(bool do_compression)
: mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) {
class ZstdCompressImpl {
public:
ZstdCompressImpl() {
cctx_ = ZSTD_createCCtx();
compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level);
}
~ZstdCompressImpl() {
ZSTD_freeCCtx(cctx_);
VLOG(1) << "zstd compressed size: " << compressed_size_total_;
VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_;
}
io::Bytes Compress(io::Bytes str);
private:
ZSTD_CCtx* cctx_;
int compression_level_ = 1;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
base::PODArray<uint8_t> compr_buf_;
};
io::Bytes ZstdCompressImpl::Compress(io::Bytes str) {
size_t buf_size = ZSTD_compressBound(str.size());
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(),
str.data(), str.size(), compression_level_);
compressed_size_total_ += compressed_size;
uncompressed_size_total_ += str.size();
return io::Bytes(compr_buf_.data(), compressed_size);
}
RdbSerializer::RdbSerializer(CompressionMode compression_mode)
: mem_buf_{4_KB}, tmp_buf_(nullptr), compression_mode_(compression_mode) {
}
RdbSerializer::~RdbSerializer() {
VLOG(1) << "compression mode: " << uint32_t(compression_mode_);
if (compression_stats_) {
VLOG(1) << "compression not effective: " << compression_stats_->compression_no_effective;
VLOG(1) << "small string none compression applied: " << compression_stats_->small_str_count;
}
}
std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) {
@ -611,6 +652,12 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
DVLOG(2) << "FlushToSink " << sz << " bytes";
if (compression_mode_ == CompressionMode::MULTY_ENTRY) {
CompressBlob();
// After blob was compressed membuf was overwirten with compressed data
sz = mem_buf_.InputLen();
}
// interrupt point.
RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer()));
mem_buf_.ConsumeInput(sz);
@ -636,7 +683,7 @@ error_code RdbSerializer::SaveString(string_view val) {
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
size_t len = val.size();
if (do_entry_level_compression_ && len > 20) {
if ((compression_mode_ == CompressionMode::SINGLE_ENTRY) && (len > 20)) {
size_t comprlen, outlen = len;
tmp_buf_.resize(outlen + 1);
@ -786,8 +833,9 @@ class RdbSaver::Impl {
RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode,
io::Sink* sink)
: sink_(sink), shard_snapshots_(producers_len),
meta_serializer_(compression_mode != CompressionMode::NONE), channel_{128, producers_len},
compression_mode_(compression_mode) {
meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression
// at all in meta serializer
channel_{128, producers_len}, compression_mode_(compression_mode) {
if (align_writes) {
aligned_buf_.emplace(kBufLen, sink);
sink_ = &aligned_buf_.value();
@ -1044,75 +1092,46 @@ void RdbSaver::Cancel() {
impl_->Cancel();
}
class ZstdCompressSerializer::ZstdCompressImpl {
public:
ZstdCompressImpl() {
cctx_ = ZSTD_createCCtx();
compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level);
void RdbSerializer::CompressBlob() {
if (!compression_stats_) {
compression_stats_.emplace();
}
~ZstdCompressImpl() {
ZSTD_freeCCtx(cctx_);
VLOG(1) << "zstd compressed size: " << compressed_size_total_;
VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_;
Bytes blob_to_compress = mem_buf_.InputBuffer();
size_t blob_size = blob_to_compress.size();
if (blob_size < kMinStrSizeToCompress) {
++compression_stats_->small_str_count;
return;
}
std::string_view Compress(std::string_view str);
private:
ZSTD_CCtx* cctx_;
int compression_level_ = 1;
base::PODArray<uint8_t> compr_buf_;
uint32_t compressed_size_total_ = 0;
uint32_t uncompressed_size_total_ = 0;
};
std::string_view ZstdCompressSerializer::ZstdCompressImpl::Compress(string_view str) {
size_t buf_size = ZSTD_compressBound(str.size());
if (compr_buf_.size() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(),
str.data(), str.size(), compression_level_);
compressed_size_total_ += compressed_size;
uncompressed_size_total_ += str.size();
return string_view(reinterpret_cast<const char*>(compr_buf_.data()), compressed_size);
}
ZstdCompressSerializer::ZstdCompressSerializer() {
impl_.reset(new ZstdCompressImpl());
}
optional<string> ZstdCompressSerializer::Compress(std::string_view str) {
if (str.size() < kMinStrSizeToCompress) {
++small_str_count_;
return nullopt;
// Compress the data
if (!compressor_impl_) {
compressor_impl_.reset(new ZstdCompressImpl());
}
// Compress the string
string_view compressed_res = impl_->Compress(str);
if (compressed_res.size() > str.size() * kMinCompressionReductionPrecentage) {
++compression_no_effective_;
return nullopt;
Bytes compressed_blob = compressor_impl_->Compress(blob_to_compress);
if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) {
++compression_stats_->compression_no_effective;
return;
}
string serialized_compressed_blob;
// Clear membuf and write the compressed blob to it
mem_buf_.ConsumeInput(blob_size);
mem_buf_.Reserve(compressed_blob.length() + 1 + 9); // reserve space for blob + opcode + len
// First write opcode for compressed string
serialized_compressed_blob.push_back(RDB_OPCODE_COMPRESSED_BLOB_START);
// Get compressed string len encoded
uint8_t buf[9];
unsigned enclen = SerializeLen(compressed_res.size(), buf);
auto dest = mem_buf_.AppendBuffer();
dest[0] = RDB_OPCODE_COMPRESSED_BLOB_START;
mem_buf_.CommitWrite(1);
// Write encoded compressed string len and than the compressed string
serialized_compressed_blob.append(reinterpret_cast<const char*>(buf), enclen);
serialized_compressed_blob.append(compressed_res);
return serialized_compressed_blob;
}
// Write encoded compressed blob len
dest = mem_buf_.AppendBuffer();
unsigned enclen = SerializeLen(compressed_blob.length(), dest.data());
mem_buf_.CommitWrite(enclen);
ZstdCompressSerializer::~ZstdCompressSerializer() {
VLOG(1) << "zstd compression not effective: " << compression_no_effective_;
VLOG(1) << "small string none compression applied: " << small_str_count_;
// Write compressed blob
dest = mem_buf_.AppendBuffer();
memcpy(dest.data(), compressed_blob.data(), compressed_blob.length());
mem_buf_.CommitWrite(compressed_blob.length());
}
} // namespace dfly

View file

@ -112,32 +112,11 @@ class RdbSaver {
CompressionMode compression_mode_;
};
class ZstdCompressSerializer {
public:
ZstdCompressSerializer();
ZstdCompressSerializer(const ZstdCompressSerializer&) = delete;
void operator=(const ZstdCompressSerializer&) = delete;
~ZstdCompressSerializer();
// Returns string if compression was applied, null otherwise
std::optional<string> Compress(std::string_view str);
private:
class ZstdCompressImpl;
std::unique_ptr<ZstdCompressImpl> impl_;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
uint32_t compression_no_effective_ = 0;
uint32_t small_str_count_ = 0;
};
class ZstdCompressImpl;
class RdbSerializer {
public:
// TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned
// mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer
// directly.
RdbSerializer(bool do_entry_level_compression);
RdbSerializer(CompressionMode compression_mode);
~RdbSerializer();
@ -158,7 +137,6 @@ class RdbSerializer {
return SaveString(std::string_view{reinterpret_cast<const char*>(buf), len});
}
// TODO(Adi) : add flag to flush compressed blob to sink, move zstd serializer under RdbSerializer
std::error_code FlushToSink(io::Sink* s);
std::error_code SaveLen(size_t len);
@ -184,12 +162,24 @@ class RdbSerializer {
std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg);
// If membuf data is compressable use compression impl to compress the data and write it to membuf
void CompressBlob();
std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
std::string tmp_str_;
bool do_entry_level_compression_;
CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.
std::unique_ptr<ZstdCompressImpl> compressor_impl_;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
struct CompressionStats {
uint32_t compression_no_effective = 0;
uint32_t small_str_count = 0;
};
std::optional<CompressionStats> compression_stats_;
};
} // namespace dfly

View file

@ -51,9 +51,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
journal_cb_id_ = journal->RegisterOnChange(move(journal_cb));
}
bool do_compression = (compression_mode_ == CompressionMode::SINGLE_ENTRY);
default_buffer_.reset(new io::StringFile);
default_serializer_.reset(new RdbSerializer(do_compression));
default_serializer_.reset(new RdbSerializer(compression_mode_));
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
@ -203,7 +202,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
optional<RdbSerializer> tmp_serializer;
RdbSerializer* serializer_ptr = default_serializer_.get();
if (db_index != current_db_) {
tmp_serializer.emplace(compression_mode_ != CompressionMode::NONE);
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
? CompressionMode::NONE
: CompressionMode::SINGLE_ENTRY;
tmp_serializer.emplace(compression_mode);
serializer_ptr = &*tmp_serializer;
}
@ -235,21 +237,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
++type_freq_map_[*res];
}
void SliceSnapshot::PushFileToChannel(DbIndex db_index, bool should_compress,
io::StringFile* sfile) {
string payload = std::move(sfile->val);
if (should_compress) {
if (!zstd_serializer_) {
zstd_serializer_.reset(new ZstdCompressSerializer());
}
if (auto comp = zstd_serializer_->Compress(payload); comp) {
payload = std::move(*comp);
}
}
dest_->Push(GetDbRecord(db_index, std::move(payload)));
void SliceSnapshot::PushFileToChannel(DbIndex db_index, io::StringFile* sfile) {
dest_->Push(GetDbRecord(db_index, std::move(sfile->val)));
}
bool SliceSnapshot::FlushDefaultBuffer(bool force) {
@ -263,8 +252,7 @@ bool SliceSnapshot::FlushDefaultBuffer(bool force) {
VLOG(2) << "FlushDefaultBuffer " << default_buffer_->val.size() << " bytes";
bool multi_entries_compression = (compression_mode_ == CompressionMode::MULTY_ENTRY);
PushFileToChannel(current_db_, multi_entries_compression, default_buffer_.get());
PushFileToChannel(current_db_, default_buffer_.get());
return true;
}
@ -290,7 +278,10 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
optional<RdbSerializer> tmp_serializer;
RdbSerializer* serializer_ptr = default_serializer_.get();
if (entry.db_ind != current_db_) {
tmp_serializer.emplace(compression_mode_ != CompressionMode::NONE);
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
? CompressionMode::NONE
: CompressionMode::SINGLE_ENTRY;
tmp_serializer.emplace(compression_mode);
serializer_ptr = &*tmp_serializer;
}
@ -327,6 +318,6 @@ void SliceSnapshot::FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializ
io::StringFile sfile{};
error_code ec = serializer->FlushToSink(&sfile);
CHECK(!ec && !sfile.val.empty());
PushFileToChannel(db_index, false, &sfile);
PushFileToChannel(db_index, &sfile);
}
} // namespace dfly

View file

@ -21,7 +21,6 @@ struct Entry;
} // namespace journal
class RdbSerializer;
class ZstdCompressSerializer;
//┌────────────────┐ ┌─────────────┐
//│IterateBucketsFb│ │ OnDbChange │
@ -97,7 +96,7 @@ class SliceSnapshot {
std::optional<uint64_t> expire, RdbSerializer* serializer);
// Push StringFile buffer to channel.
void PushFileToChannel(DbIndex db_index, bool should_compress, io::StringFile* sfile);
void PushFileToChannel(DbIndex db_index, io::StringFile* sfile);
// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
@ -148,7 +147,6 @@ class SliceSnapshot {
// TODO : drop default_buffer from this class, we dont realy need it.
std::unique_ptr<io::StringFile> default_buffer_; // filled by default_serializer_
std::unique_ptr<RdbSerializer> default_serializer_;
std::unique_ptr<ZstdCompressSerializer> zstd_serializer_;
::boost::fibers::mutex mu_;
::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb