From 3fb8b4631fd3156d30b14967457bf84e92cad89a Mon Sep 17 00:00:00 2001 From: shahar Date: Tue, 12 Nov 2024 15:22:37 +0200 Subject: [PATCH] chore: Reduce memory consumption when migrating huge values Before this PR: We serialized a `RESTORE` command for each entry into a string, and then push that string to the wire. This means that, when serializing an entry of size X, we consume 2X memory during the migration. This PR: Instead of serializing into a string, we serialize into the wire directly. Luckily, only a small modification is needed in the way we interact with `crc64`, which works well even in chunks. Fixes #4100 --- src/server/rdb_save.cc | 51 +++++++++++++++++++++++++++++++----------- src/server/rdb_save.h | 2 +- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 9bab44b8f..5405aaffa 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -933,14 +933,13 @@ VersionBuffer MakeRdbVersion() { return buf; } -CrcBuffer MakeCheckSum(std::string_view dump_res) { - uint64_t chksum = crc64(0, reinterpret_cast(dump_res.data()), dump_res.size()); +CrcBuffer WrapCheckSum(uint64_t crc) { CrcBuffer buf; - absl::little_endian::Store64(buf.data(), chksum); + absl::little_endian::Store64(buf.data(), crc); return buf; } -void AppendFooter(io::StringSink* dump_res) { +void AppendFooter(io::Sink* dump_res, uint64_t crc) { auto to_bytes = [](const auto& buf) { return io::Bytes(reinterpret_cast(buf.data()), buf.size()); }; @@ -952,18 +951,45 @@ void AppendFooter(io::StringSink* dump_res) { * RDB version and CRC are both in little endian. */ const auto ver = MakeRdbVersion(); - dump_res->Write(to_bytes(ver)); - const auto crc = MakeCheckSum(dump_res->str()); - dump_res->Write(to_bytes(crc)); + auto ver_bytes = to_bytes(ver); + dump_res->Write(ver_bytes); + + crc = crc64(crc, ver_bytes.data(), ver_bytes.size()); + const auto crc_buf = WrapCheckSum(crc); + dump_res->Write(to_bytes(crc_buf)); } } // namespace -void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) { +void SerializerBase::DumpObject(const CompactObj& obj, io::Sink* out) { CompressionMode compression_mode = GetDefaultCompressionMode(); if (compression_mode != CompressionMode::NONE) { compression_mode = CompressionMode::SINGLE_ENTRY; } - RdbSerializer serializer(compression_mode); + uint64_t crc = 0; + uint64_t total_size = 0; + auto flush = [&](RdbSerializer* srz) { + io::StringSink s; + + // Use kFlushMidEntry because we append a footer below + auto ec = srz->FlushToSink(&s, SerializerBase::FlushState::kFlushMidEntry); + RETURN_ON_ERR(ec); + + string_view sv = s.str(); + if (sv.empty()) { + return ec; // Nothing to do + } + + total_size += sv.size(); + crc = crc64(crc, reinterpret_cast(sv.data()), sv.size()); + + return out->Write(io::Buffer(sv)); + }; + + RdbSerializer serializer(compression_mode, [&](size_t size, SerializerBase::FlushState) { + auto size_before = total_size; + flush(&serializer); + DCHECK_EQ(size, total_size - size_before); + }); // According to Redis code we need to // 1. Save the value itself - without the key @@ -974,10 +1000,9 @@ void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) { CHECK(!ec); ec = serializer.SaveValue(obj); CHECK(!ec); // make sure that fully was successful - ec = serializer.FlushToSink(out, SerializerBase::FlushState::kFlushMidEntry); - CHECK(!ec); // make sure that fully was successful - AppendFooter(out); // version and crc - CHECK_GT(out->str().size(), 10u); + ec = flush(&serializer); + CHECK(!ec); // make sure that fully was successful + AppendFooter(out, crc); // version and crc } size_t SerializerBase::SerializedLen() const { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index ae3b9272b..0689eac70 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -156,7 +156,7 @@ class SerializerBase { virtual ~SerializerBase() = default; // Dumps `obj` in DUMP command format into `out`. Uses default compression mode. - static void DumpObject(const CompactObj& obj, io::StringSink* out); + static void DumpObject(const CompactObj& obj, io::Sink* out); // Internal buffer size. Might shrink after flush due to compression. size_t SerializedLen() const;