From 2d46a584a3225d298d1a75424ccd83c311269c8c Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Sun, 7 Jan 2024 07:48:29 +0200 Subject: [PATCH] refactor(SerializerBase): Move some logic from RdbSerializer to SerializerBase (#2373) * refactor(SerializerBase): Move some logic from RdbSerializer to SerializerBase Specifically SendFullSyncCut and WriteJournalEntry * gh review --- src/server/rdb_save.cc | 20 ++++++++++++-------- src/server/rdb_save.h | 38 ++++++++++++++++++-------------------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 76536f6dc..53467d96d 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -282,10 +282,10 @@ io::Result Lz4Compressor::Compress(io::Bytes data) { } SerializerBase::SerializerBase(CompressionMode compression_mode) - : compression_mode_(compression_mode), mem_buf_{4_KB} { + : compression_mode_(compression_mode), mem_buf_{4_KB}, tmp_buf_(nullptr) { } -RdbSerializer::RdbSerializer(CompressionMode compression_mode) - : SerializerBase(compression_mode), tmp_buf_(nullptr) { + +RdbSerializer::RdbSerializer(CompressionMode compression_mode) : SerializerBase(compression_mode) { } RdbSerializer::~RdbSerializer() { @@ -755,7 +755,7 @@ error_code RdbSerializer::SendJournalOffset(uint64_t journal_offset) { return WriteRaw(buf); } -error_code RdbSerializer::SendFullSyncCut() { +error_code SerializerBase::SendFullSyncCut() { VLOG(2) << "SendFullSyncCut"; RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); @@ -767,6 +767,10 @@ error_code RdbSerializer::SendFullSyncCut() { return WriteRaw(buf); } +std::error_code SerializerBase::WriteOpcode(uint8_t opcode) { + return WriteRaw(::io::Bytes{&opcode, 1}); +} + size_t SerializerBase::GetTotalBufferCapacity() const { return mem_buf_.Capacity(); } @@ -879,7 +883,7 @@ io::Bytes SerializerBase::PrepareFlush() { return mem_buf_.InputBuffer(); } -error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) { +error_code SerializerBase::WriteJournalEntry(std::string_view serialized_entry) { VLOG(2) << "WriteJournalEntry"; RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(SaveLen(1)); @@ -887,7 +891,7 @@ error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) { return error_code{}; } -error_code RdbSerializer::SaveString(string_view val) { +error_code SerializerBase::SaveString(string_view val) { /* Try integer encoding */ if (val.size() <= 11) { uint8_t buf[16]; @@ -927,13 +931,13 @@ error_code RdbSerializer::SaveString(string_view val) { return error_code{}; } -error_code RdbSerializer::SaveLen(size_t len) { +error_code SerializerBase::SaveLen(size_t len) { uint8_t buf[16]; unsigned enclen = WritePackedUInt(len, buf); return WriteRaw(Bytes{buf, enclen}); } -error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) { +error_code SerializerBase::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) { /* Data compressed! Let's save it on disk */ uint8_t opcode = (RDB_ENCVAL << 6) | RDB_ENC_LZF; RETURN_ON_ERR(WriteOpcode(opcode)); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index b0e6aab64..dc7a09bcc 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -149,6 +149,20 @@ class SerializerBase { std::error_code WriteRaw(const ::io::Bytes& buf); + // Write journal entry as an embedded journal blob. + std::error_code WriteJournalEntry(std::string_view entry); + + // Send FULL_SYNC_CUT opcode to notify that all static data was sent. + std::error_code SendFullSyncCut(); + + std::error_code WriteOpcode(uint8_t opcode); + + std::error_code SaveLen(size_t len); + std::error_code SaveString(std::string_view val); + std::error_code SaveString(const uint8_t* buf, size_t len) { + return SaveString(io::View(io::Bytes{buf, len})); + } + protected: // Prepare internal buffer for flush. Compress it. io::Bytes PrepareFlush(); @@ -157,6 +171,8 @@ class SerializerBase { void CompressBlob(); void AllocateCompressorOnce(); + std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); + CompressionMode compression_mode_; base::IoBuf mem_buf_; std::unique_ptr compressor_impl_; @@ -170,6 +186,8 @@ class SerializerBase { uint32_t compressed_blobs = 0; }; std::optional compression_stats_; + base::PODArray tmp_buf_; + std::unique_ptr lzf_; }; class RdbSerializer : public SerializerBase { @@ -190,32 +208,15 @@ class RdbSerializer : public SerializerBase { io::Result SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms, DbIndex dbid); - std::error_code SaveLen(size_t len); - std::error_code SaveString(std::string_view val); - std::error_code SaveString(const uint8_t* buf, size_t len) { - return SaveString(io::View(io::Bytes{buf, len})); - } - // This would work for either string or an object. // The arg pv is taken from it->second if accessing // this by finding the key. This function is used // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); - std::error_code WriteOpcode(uint8_t opcode) { - return WriteRaw(::io::Bytes{&opcode, 1}); - } - - // Write journal entry as an embedded journal blob. - std::error_code WriteJournalEntry(std::string_view entry); - std::error_code SendJournalOffset(uint64_t journal_offset); - // Send FULL_SYNC_CUT opcode to notify that all static data was sent. - std::error_code SendFullSyncCut(); - private: - std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv); std::error_code SaveListObject(const robj* obj); std::error_code SaveSetObject(const PrimeValue& pv); @@ -231,10 +232,7 @@ class RdbSerializer : public SerializerBase { std::error_code SaveStreamConsumers(streamCG* cg); std::string tmp_str_; - base::PODArray tmp_buf_; DbIndex last_entry_db_index_ = kInvalidDbId; - - std::unique_ptr lzf_; }; // Serializes CompactObj as RESTORE commands.