1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(server): Account for serializer's temporary buffer size (#2689)

* feat(server): Account for serializer's temporary buffer size

* gh comments
This commit is contained in:
Shahar Mike 2024-03-06 13:39:32 +02:00 committed by GitHub
parent 35b0ab101e
commit 66b87e16c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 30 additions and 8 deletions

View file

@ -758,10 +758,14 @@ std::error_code SerializerBase::WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
size_t SerializerBase::GetTotalBufferCapacity() const {
size_t SerializerBase::GetBufferCapacity() const {
return mem_buf_.Capacity();
}
size_t SerializerBase::GetTempBufferSize() const {
return tmp_buf_.size();
}
error_code SerializerBase::WriteRaw(const io::Bytes& buf) {
mem_buf_.Reserve(mem_buf_.InputLen() + buf.size());
IoBuf::Bytes dest = mem_buf_.AppendBuffer();
@ -1253,7 +1257,8 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const {
auto cb = [this, &channel_bytes, &serializer_bytes](ShardId sid) {
auto& snapshot = shard_snapshots_[sid];
channel_bytes.fetch_add(snapshot->GetTotalChannelCapacity(), memory_order_relaxed);
serializer_bytes.store(snapshot->GetTotalBufferCapacity(), memory_order_relaxed);
serializer_bytes.store(snapshot->GetBufferCapacity() + snapshot->GetTempBuffersSize(),
memory_order_relaxed);
};
if (shard_snapshots_.size() == 1) {
@ -1549,4 +1554,8 @@ void SerializerBase::CompressBlob() {
++compression_stats_->compressed_blobs;
}
size_t RdbSerializer::GetTempBufferSize() const {
return SerializerBase::GetTempBufferSize() + tmp_str_.size();
}
} // namespace dfly

View file

@ -151,7 +151,8 @@ class SerializerBase {
// Flush internal buffer to sink.
virtual std::error_code FlushToSink(io::Sink* s);
size_t GetTotalBufferCapacity() const;
size_t GetBufferCapacity() const;
virtual size_t GetTempBufferSize() const;
std::error_code WriteRaw(const ::io::Bytes& buf);
@ -219,6 +220,8 @@ class RdbSerializer : public SerializerBase {
std::error_code SendJournalOffset(uint64_t journal_offset);
size_t GetTempBufferSize() const override;
private:
std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const PrimeValue& pv);

View file

@ -43,7 +43,7 @@ SliceSnapshot::~SliceSnapshot() {
size_t SliceSnapshot::GetThreadLocalMemoryUsage() {
size_t mem = 0;
for (SliceSnapshot* snapshot : tl_slice_snapshots) {
mem += snapshot->GetTotalBufferCapacity() + snapshot->GetTotalChannelCapacity();
mem += snapshot->GetBufferCapacity() + snapshot->GetTotalChannelCapacity();
}
return mem;
}
@ -352,18 +352,26 @@ void SliceSnapshot::CloseRecordChannel() {
}
}
size_t SliceSnapshot::GetTotalBufferCapacity() const {
size_t SliceSnapshot::GetBufferCapacity() const {
if (serializer_ == nullptr) {
return 0;
}
return serializer_->GetTotalBufferCapacity();
return serializer_->GetBufferCapacity();
}
size_t SliceSnapshot::GetTotalChannelCapacity() const {
return dest_->GetSize();
}
size_t SliceSnapshot::GetTempBuffersSize() const {
if (serializer_ == nullptr) {
return 0;
}
return serializer_->GetTempBufferSize();
}
RdbSaver::SnapshotStats SliceSnapshot::GetCurrentSnapshotProgress() const {
return {stats_.loop_serialized + stats_.side_saved, stats_.keys_total};
}

View file

@ -124,8 +124,10 @@ class SliceSnapshot {
return type_freq_map_;
}
size_t GetTotalBufferCapacity() const; // In bytes
size_t GetTotalChannelCapacity() const; // In bytes
// Get different sizes, in bytes. All disjoint.
size_t GetBufferCapacity() const;
size_t GetTotalChannelCapacity() const;
size_t GetTempBuffersSize() const;
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;