1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: Reduce memory consumption when migrating huge values

Before this PR:

We serialized a `RESTORE` command for each entry into a string, and then
push that string to the wire.
This means that, when serializing an entry of size X, we consume 2X
memory during the migration.

This PR:

Instead of serializing into a string, we serialize into the wire
directly. Luckily, only a small modification is needed in the way we
interact with `crc64`, which works well even in chunks.

Fixes #4100
This commit is contained in:
shahar 2024-11-12 15:22:37 +02:00
parent 79aa5d490d
commit 3fb8b4631f
No known key found for this signature in database
GPG key ID: 2ED7F7EDC7FDCCA4
2 changed files with 39 additions and 14 deletions

View file

@ -933,14 +933,13 @@ VersionBuffer MakeRdbVersion() {
return buf; return buf;
} }
CrcBuffer MakeCheckSum(std::string_view dump_res) { CrcBuffer WrapCheckSum(uint64_t crc) {
uint64_t chksum = crc64(0, reinterpret_cast<const uint8_t*>(dump_res.data()), dump_res.size());
CrcBuffer buf; CrcBuffer buf;
absl::little_endian::Store64(buf.data(), chksum); absl::little_endian::Store64(buf.data(), crc);
return buf; return buf;
} }
void AppendFooter(io::StringSink* dump_res) { void AppendFooter(io::Sink* dump_res, uint64_t crc) {
auto to_bytes = [](const auto& buf) { auto to_bytes = [](const auto& buf) {
return io::Bytes(reinterpret_cast<const uint8_t*>(buf.data()), buf.size()); return io::Bytes(reinterpret_cast<const uint8_t*>(buf.data()), buf.size());
}; };
@ -952,18 +951,45 @@ void AppendFooter(io::StringSink* dump_res) {
* RDB version and CRC are both in little endian. * RDB version and CRC are both in little endian.
*/ */
const auto ver = MakeRdbVersion(); const auto ver = MakeRdbVersion();
dump_res->Write(to_bytes(ver)); auto ver_bytes = to_bytes(ver);
const auto crc = MakeCheckSum(dump_res->str()); dump_res->Write(ver_bytes);
dump_res->Write(to_bytes(crc));
crc = crc64(crc, ver_bytes.data(), ver_bytes.size());
const auto crc_buf = WrapCheckSum(crc);
dump_res->Write(to_bytes(crc_buf));
} }
} // namespace } // namespace
void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) { void SerializerBase::DumpObject(const CompactObj& obj, io::Sink* out) {
CompressionMode compression_mode = GetDefaultCompressionMode(); CompressionMode compression_mode = GetDefaultCompressionMode();
if (compression_mode != CompressionMode::NONE) { if (compression_mode != CompressionMode::NONE) {
compression_mode = CompressionMode::SINGLE_ENTRY; compression_mode = CompressionMode::SINGLE_ENTRY;
} }
RdbSerializer serializer(compression_mode); uint64_t crc = 0;
uint64_t total_size = 0;
auto flush = [&](RdbSerializer* srz) {
io::StringSink s;
// Use kFlushMidEntry because we append a footer below
auto ec = srz->FlushToSink(&s, SerializerBase::FlushState::kFlushMidEntry);
RETURN_ON_ERR(ec);
string_view sv = s.str();
if (sv.empty()) {
return ec; // Nothing to do
}
total_size += sv.size();
crc = crc64(crc, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
return out->Write(io::Buffer(sv));
};
RdbSerializer serializer(compression_mode, [&](size_t size, SerializerBase::FlushState) {
auto size_before = total_size;
flush(&serializer);
DCHECK_EQ(size, total_size - size_before);
});
// According to Redis code we need to // According to Redis code we need to
// 1. Save the value itself - without the key // 1. Save the value itself - without the key
@ -974,10 +1000,9 @@ void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) {
CHECK(!ec); CHECK(!ec);
ec = serializer.SaveValue(obj); ec = serializer.SaveValue(obj);
CHECK(!ec); // make sure that fully was successful CHECK(!ec); // make sure that fully was successful
ec = serializer.FlushToSink(out, SerializerBase::FlushState::kFlushMidEntry); ec = flush(&serializer);
CHECK(!ec); // make sure that fully was successful CHECK(!ec); // make sure that fully was successful
AppendFooter(out); // version and crc AppendFooter(out, crc); // version and crc
CHECK_GT(out->str().size(), 10u);
} }
size_t SerializerBase::SerializedLen() const { size_t SerializerBase::SerializedLen() const {

View file

@ -156,7 +156,7 @@ class SerializerBase {
virtual ~SerializerBase() = default; virtual ~SerializerBase() = default;
// Dumps `obj` in DUMP command format into `out`. Uses default compression mode. // Dumps `obj` in DUMP command format into `out`. Uses default compression mode.
static void DumpObject(const CompactObj& obj, io::StringSink* out); static void DumpObject(const CompactObj& obj, io::Sink* out);
// Internal buffer size. Might shrink after flush due to compression. // Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const; size_t SerializedLen() const;