mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
refactor: remove redundant allocations for streamer (#4225)
* refactor: remove redundant allocations for streamer
This commit is contained in:
parent
c2f8349c51
commit
071e299971
8 changed files with 213 additions and 50 deletions
|
@ -281,7 +281,8 @@ void OutgoingMigration::SyncFb() {
|
|||
bool OutgoingMigration::FinalizeMigration(long attempt) {
|
||||
// if it's not the 1st attempt and flows are work correctly we try to
|
||||
// reconnect and ACK one more time
|
||||
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
|
||||
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id
|
||||
<< " attempt " << attempt;
|
||||
if (attempt > 1) {
|
||||
if (cntx_.GetError()) {
|
||||
return true;
|
||||
|
|
|
@ -140,8 +140,13 @@ extern unsigned kernel_version;
|
|||
|
||||
const char* GlobalStateName(GlobalState gs);
|
||||
|
||||
template <typename RandGen> std::string GetRandomHex(RandGen& gen, size_t len) {
|
||||
template <typename RandGen>
|
||||
std::string GetRandomHex(RandGen& gen, size_t len, size_t len_deviation = 0) {
|
||||
static_assert(std::is_same<uint64_t, decltype(gen())>::value);
|
||||
if (len_deviation) {
|
||||
len += (gen() % len_deviation);
|
||||
}
|
||||
|
||||
std::string res(len, '\0');
|
||||
size_t indx = 0;
|
||||
|
||||
|
|
|
@ -778,7 +778,7 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
|
|||
|
||||
const auto& flow = info->flows[shard->shard_id()];
|
||||
if (flow.streamer)
|
||||
streamer_bytes.fetch_add(flow.streamer->GetTotalBufferCapacities(), memory_order_relaxed);
|
||||
streamer_bytes.fetch_add(flow.streamer->UsedBytes(), memory_order_relaxed);
|
||||
if (flow.saver)
|
||||
full_sync_bytes.fetch_add(flow.saver->GetTotalBuffersSize(), memory_order_relaxed);
|
||||
}
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
#include <random>
|
||||
#include <string>
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "server/journal/pending_buf.h"
|
||||
#include "server/journal/serializer.h"
|
||||
#include "server/journal/types.h"
|
||||
#include "server/serializer_commons.h"
|
||||
|
@ -125,5 +127,95 @@ TEST(Journal, WriteRead) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(Journal, PendingBuf) {
|
||||
PendingBuf pbuf;
|
||||
|
||||
ASSERT_TRUE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), 0);
|
||||
|
||||
pbuf.Push("one");
|
||||
pbuf.Push(" smallllllllllllllllllllllllllllllll");
|
||||
pbuf.Push(" test");
|
||||
|
||||
ASSERT_FALSE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), 44);
|
||||
|
||||
{
|
||||
auto& sending_buf = pbuf.PrepareSendingBuf();
|
||||
ASSERT_EQ(sending_buf.buf.size(), 3);
|
||||
ASSERT_EQ(sending_buf.mem_size, 44);
|
||||
|
||||
ASSERT_EQ(sending_buf.buf[0], "one");
|
||||
ASSERT_EQ(sending_buf.buf[1], " smallllllllllllllllllllllllllllllll");
|
||||
ASSERT_EQ(sending_buf.buf[2], " test");
|
||||
}
|
||||
|
||||
const size_t string_num = PendingBuf::Buf::kMaxBufSize + 1000;
|
||||
std::vector<std::string> test_data;
|
||||
test_data.reserve(string_num);
|
||||
|
||||
absl::InsecureBitGen gen;
|
||||
|
||||
for (size_t i = 0; i < string_num; ++i) {
|
||||
auto str = GetRandomHex(gen, 10, 90);
|
||||
test_data.push_back(str);
|
||||
pbuf.Push(std::move(str));
|
||||
}
|
||||
|
||||
const size_t test_data_size =
|
||||
std::accumulate(test_data.begin(), test_data.end(), 0,
|
||||
[](size_t size, const auto& s) { return s.size() + size; });
|
||||
|
||||
ASSERT_FALSE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), 44 + test_data_size);
|
||||
|
||||
pbuf.Pop();
|
||||
|
||||
ASSERT_FALSE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), test_data_size);
|
||||
|
||||
{
|
||||
auto& sending_buf = pbuf.PrepareSendingBuf();
|
||||
|
||||
const size_t send_buf_size =
|
||||
std::accumulate(test_data.begin(), test_data.begin() + PendingBuf::Buf::kMaxBufSize, 0,
|
||||
[](size_t size, const auto& s) { return s.size() + size; });
|
||||
|
||||
ASSERT_EQ(sending_buf.buf.size(), PendingBuf::Buf::kMaxBufSize);
|
||||
ASSERT_EQ(sending_buf.mem_size, send_buf_size);
|
||||
|
||||
for (size_t i = 0; i < sending_buf.buf.size(); ++i) {
|
||||
ASSERT_EQ(sending_buf.buf[i], test_data[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pbuf.Pop();
|
||||
|
||||
test_data.erase(test_data.begin(), test_data.begin() + PendingBuf::Buf::kMaxBufSize);
|
||||
|
||||
const size_t last_buf_size =
|
||||
std::accumulate(test_data.begin(), test_data.end(), 0,
|
||||
[](size_t size, const auto& s) { return s.size() + size; });
|
||||
|
||||
ASSERT_FALSE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), last_buf_size);
|
||||
|
||||
{
|
||||
auto& sending_buf = pbuf.PrepareSendingBuf();
|
||||
|
||||
ASSERT_EQ(sending_buf.buf.size(), 1000);
|
||||
ASSERT_EQ(sending_buf.mem_size, last_buf_size);
|
||||
|
||||
for (size_t i = 0; i < sending_buf.buf.size(); ++i) {
|
||||
ASSERT_EQ(sending_buf.buf[i], test_data[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pbuf.Pop();
|
||||
|
||||
ASSERT_TRUE(pbuf.Empty());
|
||||
ASSERT_EQ(pbuf.Size(), 0);
|
||||
}
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
||||
|
|
69
src/server/journal/pending_buf.h
Normal file
69
src/server/journal/pending_buf.h
Normal file
|
@ -0,0 +1,69 @@
|
|||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <absl/container/inlined_vector.h>
|
||||
|
||||
#include <deque>
|
||||
#include <numeric>
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class PendingBuf {
|
||||
public:
|
||||
struct Buf {
|
||||
size_t mem_size = 0;
|
||||
absl::InlinedVector<std::string, 8> buf;
|
||||
|
||||
#ifdef UIO_MAXIOV
|
||||
static constexpr size_t kMaxBufSize = UIO_MAXIOV;
|
||||
#else
|
||||
static constexpr size_t kMaxBufSize = 1024;
|
||||
#endif
|
||||
};
|
||||
|
||||
PendingBuf() : bufs_(1) {
|
||||
}
|
||||
|
||||
bool Empty() const {
|
||||
return std::all_of(bufs_.begin(), bufs_.end(), [](const auto& b) { return b.buf.empty(); });
|
||||
}
|
||||
|
||||
void Push(std::string str) {
|
||||
DCHECK(!bufs_.empty());
|
||||
if (bufs_.back().buf.size() == Buf::kMaxBufSize) {
|
||||
bufs_.emplace_back();
|
||||
}
|
||||
auto& fron_buf = bufs_.back();
|
||||
|
||||
fron_buf.mem_size += str.size();
|
||||
fron_buf.buf.push_back(std::move(str));
|
||||
}
|
||||
|
||||
// should be called to get the next buffer for sending
|
||||
const Buf& PrepareSendingBuf() {
|
||||
// Adding to the buffer ensures that future `Push()`es will not modify the in-flight buffer
|
||||
if (bufs_.size() == 1) {
|
||||
bufs_.emplace_back();
|
||||
}
|
||||
return bufs_.front();
|
||||
}
|
||||
|
||||
// should be called when the buf from PrepareSendingBuf() method was sent
|
||||
void Pop() {
|
||||
DCHECK(bufs_.size() >= 2);
|
||||
bufs_.pop_front();
|
||||
}
|
||||
|
||||
size_t Size() const {
|
||||
return std::accumulate(bufs_.begin(), bufs_.end(), 0,
|
||||
[](size_t s, const auto& b) { return s + b.mem_size; });
|
||||
}
|
||||
|
||||
private:
|
||||
std::deque<Buf> bufs_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
|
@ -72,7 +72,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
|
|||
io::StringSink sink;
|
||||
JournalWriter writer(&sink);
|
||||
writer.Write(Entry{journal::Op::LSN, item.lsn});
|
||||
Write(sink.str());
|
||||
Write(std::move(sink).str());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -86,51 +86,44 @@ void JournalStreamer::Cancel() {
|
|||
}
|
||||
}
|
||||
|
||||
size_t JournalStreamer::GetTotalBufferCapacities() const {
|
||||
return in_flight_bytes_ + pending_buf_.capacity();
|
||||
size_t JournalStreamer::UsedBytes() const {
|
||||
return pending_buf_.Size();
|
||||
}
|
||||
|
||||
void JournalStreamer::Write(std::string_view str) {
|
||||
DCHECK(!str.empty());
|
||||
DVLOG(3) << "Writing " << str.size() << " bytes";
|
||||
|
||||
size_t total_pending = pending_buf_.size() + str.size();
|
||||
void JournalStreamer::AsyncWrite() {
|
||||
DCHECK(!pending_buf_.Empty());
|
||||
|
||||
if (in_flight_bytes_ > 0) {
|
||||
// We can not flush data while there are in flight requests because AsyncWrite
|
||||
// is not atomic. Therefore, we just aggregate.
|
||||
size_t tail = pending_buf_.size();
|
||||
pending_buf_.resize(pending_buf_.size() + str.size());
|
||||
memcpy(pending_buf_.data() + tail, str.data(), str.size());
|
||||
return;
|
||||
}
|
||||
|
||||
// If we do not have any in flight requests we send the string right a way.
|
||||
// We can not aggregate it since we do not know when the next update will follow.
|
||||
// because of potential SOO with strings, we allocate explicitly on heap.
|
||||
uint8_t* buf(new uint8_t[str.size()]);
|
||||
const auto& cur_buf = pending_buf_.PrepareSendingBuf();
|
||||
|
||||
// TODO: it is possible to remove these redundant copies if we adjust high level
|
||||
// interfaces to pass reference-counted buffers.
|
||||
memcpy(buf, str.data(), str.size());
|
||||
in_flight_bytes_ += total_pending;
|
||||
total_sent_ += total_pending;
|
||||
in_flight_bytes_ = cur_buf.mem_size;
|
||||
total_sent_ += cur_buf.mem_size;
|
||||
|
||||
iovec v[2];
|
||||
unsigned next_buf_id = 0;
|
||||
const auto v_size = cur_buf.buf.size();
|
||||
absl::InlinedVector<iovec, 8> v(v_size);
|
||||
|
||||
if (!pending_buf_.empty()) {
|
||||
v[0] = IoVec(pending_buf_);
|
||||
++next_buf_id;
|
||||
for (size_t i = 0; i < v_size; ++i) {
|
||||
const auto* uptr = reinterpret_cast<const uint8_t*>(cur_buf.buf[i].data());
|
||||
v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size()));
|
||||
}
|
||||
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));
|
||||
|
||||
dest_->AsyncWrite(
|
||||
v, next_buf_id,
|
||||
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
|
||||
delete[] buf;
|
||||
OnCompletion(ec, len);
|
||||
});
|
||||
dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) {
|
||||
OnCompletion(std::move(ec), len);
|
||||
});
|
||||
}
|
||||
|
||||
void JournalStreamer::Write(std::string str) {
|
||||
DCHECK(!str.empty());
|
||||
DVLOG(2) << "Writing " << str.size() << " bytes";
|
||||
|
||||
pending_buf_.Push(std::move(str));
|
||||
|
||||
AsyncWrite();
|
||||
}
|
||||
|
||||
void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
|
||||
|
@ -138,15 +131,13 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
|
|||
|
||||
DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len;
|
||||
in_flight_bytes_ -= len;
|
||||
if (in_flight_bytes_ == 0) {
|
||||
pending_buf_.Pop();
|
||||
}
|
||||
if (ec && !IsStopped()) {
|
||||
cntx_->ReportError(ec);
|
||||
} else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) {
|
||||
// If everything was sent but we have a pending buf, flush it.
|
||||
io::Bytes src(pending_buf_);
|
||||
in_flight_bytes_ += src.size();
|
||||
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
|
||||
OnCompletion(ec, buf.size());
|
||||
});
|
||||
} else if (!pending_buf_.Empty() && !IsStopped()) {
|
||||
AsyncWrite();
|
||||
}
|
||||
|
||||
// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
|
||||
|
@ -186,7 +177,7 @@ void JournalStreamer::WaitForInflightToComplete() {
|
|||
}
|
||||
|
||||
bool JournalStreamer::IsStalled() const {
|
||||
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
|
||||
return pending_buf_.Size() >= replication_stream_output_limit_cached;
|
||||
}
|
||||
|
||||
RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
|
||||
|
@ -245,7 +236,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
|
|||
io::StringSink sink;
|
||||
JournalWriter writer{&sink};
|
||||
writer.Write(entry);
|
||||
Write(sink.str());
|
||||
Write(std::move(sink).str());
|
||||
|
||||
// TODO: is the intent here to flush everything?
|
||||
//
|
||||
|
@ -329,7 +320,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
|
|||
|
||||
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
|
||||
uint64_t expire_ms) {
|
||||
CmdSerializer serializer([&](std::string s) { Write(s); });
|
||||
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); });
|
||||
serializer.SerializeEntry(key, pk, pv, expire_ms);
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,12 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
|
||||
#include "server/common.h"
|
||||
#include "server/db_slice.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/journal/pending_buf.h"
|
||||
#include "server/journal/serializer.h"
|
||||
#include "server/rdb_save.h"
|
||||
|
||||
|
@ -30,7 +33,7 @@ class JournalStreamer {
|
|||
// and manual cleanup.
|
||||
virtual void Cancel();
|
||||
|
||||
size_t GetTotalBufferCapacities() const;
|
||||
size_t UsedBytes() const;
|
||||
|
||||
protected:
|
||||
// TODO: we copy the string on each write because JournalItem may be passed to multiple
|
||||
|
@ -38,7 +41,7 @@ class JournalStreamer {
|
|||
// or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings.
|
||||
// Also, for small strings it's more peformant to copy to the intermediate buffer than
|
||||
// to issue an io operation.
|
||||
void Write(std::string_view str);
|
||||
void Write(std::string str);
|
||||
|
||||
// Blocks the if the consumer if not keeping up.
|
||||
void ThrottleIfNeeded();
|
||||
|
@ -53,6 +56,7 @@ class JournalStreamer {
|
|||
Context* cntx_;
|
||||
|
||||
private:
|
||||
void AsyncWrite();
|
||||
void OnCompletion(std::error_code ec, size_t len);
|
||||
|
||||
bool IsStopped() const {
|
||||
|
@ -62,9 +66,10 @@ class JournalStreamer {
|
|||
bool IsStalled() const;
|
||||
|
||||
journal::Journal* journal_;
|
||||
std::vector<uint8_t> pending_buf_;
|
||||
size_t in_flight_bytes_ = 0, total_sent_ = 0;
|
||||
|
||||
PendingBuf pending_buf_;
|
||||
|
||||
size_t in_flight_bytes_ = 0, total_sent_ = 0;
|
||||
time_t last_lsn_time_ = 0;
|
||||
util::fb2::EventCount waker_;
|
||||
uint32_t journal_cb_id_{0};
|
||||
|
|
|
@ -1300,7 +1300,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
|
|||
df_factory.create(
|
||||
port=BASE_PORT + i,
|
||||
admin_port=BASE_PORT + i + 1000,
|
||||
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
|
||||
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
|
||||
)
|
||||
for i in range(2)
|
||||
]
|
||||
|
|
Loading…
Reference in a new issue