1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore(facade): Update SinkReplyBuilder2 (#3474)

This commit is contained in:
Vladislav 2024-08-08 21:40:49 +03:00 committed by GitHub
parent fb7a6a724d
commit 5258bbacd1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 163 additions and 48 deletions

View file

@ -193,28 +193,49 @@ size_t SinkReplyBuilder::UsedMemory() const {
return dfly::HeapSize(batch_); return dfly::HeapSize(batch_);
} }
void SinkReplyBuilder2::Write(std::string_view str) { SinkReplyBuilder2::ReplyAggregator::~ReplyAggregator() {
DCHECK(scoped_); rb->batched_ = prev;
if (str.size() >= 32) if (!prev)
WriteRef(str); rb->Flush();
else }
WritePiece(str);
SinkReplyBuilder2::ReplyScope::~ReplyScope() {
rb->scoped_ = prev;
if (!prev)
rb->FinishScope();
}
void SinkReplyBuilder2::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);
SendError(error.ToSv(), error.kind);
}
void SinkReplyBuilder2::SendError(OpStatus status) {
if (status == OpStatus::OK)
return SendOk();
SendError(StatusToMsg(status));
}
void SinkReplyBuilder2::CloseConnection() {
if (!ec_)
ec_ = std::make_error_code(std::errc::connection_aborted);
} }
char* SinkReplyBuilder2::ReservePiece(size_t size) { char* SinkReplyBuilder2::ReservePiece(size_t size) {
if (buffer_.AppendBuffer().size() <= size) if (buffer_.AppendLen() <= size)
Flush(); Flush();
char* dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data()); char* dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data());
if (vecs_.empty() || !IsInBuf(vecs_.back().iov_base))
// Start new vec for piece if last one dones't point at buffer tail
if (vecs_.empty() || ((char*)vecs_.back().iov_base) + vecs_.back().iov_len != dest)
NextVec({dest, 0}); NextVec({dest, 0});
return dest; return dest;
} }
void SinkReplyBuilder2::CommitPiece(size_t size) { void SinkReplyBuilder2::CommitPiece(size_t size) {
DCHECK(IsInBuf(vecs_.back().iov_base));
buffer_.CommitWrite(size); buffer_.CommitWrite(size);
vecs_.back().iov_len += size; vecs_.back().iov_len += size;
total_size_ += size; total_size_ += size;
@ -232,30 +253,56 @@ void SinkReplyBuilder2::WriteRef(std::string_view str) {
} }
void SinkReplyBuilder2::Flush() { void SinkReplyBuilder2::Flush() {
auto ec = sink_->Write(vecs_.data(), vecs_.size()); auto& reply_stats = tl_facade_stats->reply_stats;
if (ec)
send_active_ = true;
uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.io_write_cnt++;
reply_stats.io_write_bytes += total_size_;
if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec)
ec_ = ec; ec_ = ec;
uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.send_stats.count++;
reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000;
send_active_ = false;
if (buffer_.InputLen() * 2 > buffer_.Capacity()) // If needed, grow backing buffer
buffer_.Reserve(std::min(kMaxBufferSize, buffer_.Capacity() * 2));
total_size_ = 0;
buffer_.Clear(); buffer_.Clear();
vecs_.clear(); vecs_.clear();
total_size_ = 0; guaranteed_pieces_ = 0;
} }
void SinkReplyBuilder2::FinishScope() { void SinkReplyBuilder2::FinishScope() {
// If batching or aggregations are not enabled, flush if (!batched_ || total_size_ * 2 >= kMaxBufferSize)
Flush(); return Flush();
// TODO: otherwise iterate over vec_ and copy items to buffer_ // Check if we have enough space to copy all refs to buffer
// whilst also updating their pointers size_t ref_bytes = total_size_ - buffer_.InputLen();
} if (ref_bytes > buffer_.AppendLen())
return Flush();
bool SinkReplyBuilder2::IsInBuf(const void* ptr) const { // Copy all extenral references to buffer to safely keep batching
for (size_t i = guaranteed_pieces_; i < vecs_.size(); i++) {
auto ib = buffer_.InputBuffer(); auto ib = buffer_.InputBuffer();
return ptr >= ib.data() && ptr <= ib.data() + ib.size(); if (vecs_[i].iov_base >= ib.data() && vecs_[i].iov_base <= ib.data() + ib.size())
continue; // this is a piece
DCHECK_LE(buffer_.AppendLen(), vecs_[i].iov_len);
void* dest = buffer_.AppendBuffer().data();
memcpy(dest, vecs_[i].iov_base, vecs_[i].iov_len);
buffer_.CommitWrite(vecs_[i].iov_len);
vecs_[i].iov_base = dest;
}
guaranteed_pieces_ = vecs_.size(); // all vecs are pieces
} }
void SinkReplyBuilder2::NextVec(std::string_view str) { void SinkReplyBuilder2::NextVec(std::string_view str) {
if (vecs_.size() >= IOV_MAX) if (vecs_.size() >= IOV_MAX - 2)
Flush(); Flush();
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()}); vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
} }

View file

@ -167,34 +167,98 @@ class SinkReplyBuilder {
bool send_active_ : 1; bool send_active_ : 1;
}; };
// TMP: New version of reply builder that batches not only to a buffer, but also iovecs. // Base class for all reply builders. Offer a simple high level interface for controlling output
// modes and sending basic response types.
class SinkReplyBuilder2 { class SinkReplyBuilder2 {
explicit SinkReplyBuilder2(io::Sink* sink) : sink_(sink) { struct GuardBase {
} bool prev;
// Use with care: All send calls within a scope must keep their data alive!
// This allows to fully eliminate copies for batches of data by using vectorized io.
struct ReplyScope {
explicit ReplyScope(SinkReplyBuilder2* rb) : prev_scoped(rb->scoped_), rb(rb) {
rb->scoped_ = true;
}
~ReplyScope() {
if (!prev_scoped) {
rb->scoped_ = false;
rb->FinishScope();
}
}
private:
bool prev_scoped;
SinkReplyBuilder2* rb; SinkReplyBuilder2* rb;
}; };
public: public:
void Write(std::string_view str); constexpr static size_t kMaxInlineSize = 32;
constexpr static size_t kMaxBufferSize = 8192;
explicit SinkReplyBuilder2(io::Sink* sink) : sink_(sink) {
}
// USE WITH CARE! ReplyScope assumes that all string views in Send calls keep valid for the scopes
// lifetime. This allows the builder to avoid copies by enqueueing long strings directly for
// vectorized io.
struct ReplyScope : GuardBase {
explicit ReplyScope(SinkReplyBuilder2* rb) : GuardBase{std::exchange(rb->scoped_, true), rb} {
}
~ReplyScope();
};
// Aggregator reduces the number of raw send calls by copying data in an intermediate buffer.
// Prefer ReplyScope if possible to additionally reduce the number of copies.
struct ReplyAggregator : GuardBase {
explicit ReplyAggregator(SinkReplyBuilder2* rb)
: GuardBase{std::exchange(rb->batched_, true), rb} {
}
~ReplyAggregator();
};
void Flush(); // Send all accumulated data and reset to clear state
std::error_code GetError() const {
return ec_;
}
size_t UsedMemory() const {
return buffer_.Capacity();
}
bool IsSendActive() {
return send_active_;
}
void SetBatchMode(bool b) {
batched_ = b;
}
void CloseConnection();
static const ReplyStats& GetThreadLocalStats() {
return tl_facade_stats->reply_stats;
}
static void ResetThreadLocalStats() {
tl_facade_stats->reply_stats = {};
}
public: // High level interface
virtual void SendLong(long val) = 0;
virtual void SendSimpleString(std::string_view str) = 0;
virtual void SendStored() = 0;
virtual void SendSetSkipped() = 0;
void SendOk() {
SendSimpleString("OK");
}
virtual void SendError(std::string_view str, std::string_view type = {}) = 0; // MC and Redis
void SendError(OpStatus status);
void SendError(ErrorReply error);
virtual void SendProtocolError(std::string_view str) = 0;
protected: protected:
void Flush(); // Send all accumulated data and reset to clear state void WriteI(std::string_view str) {
str.size() > kMaxInlineSize ? WriteRef(str) : WritePiece(str);
}
// Constexpr arrays are assumed to be protocol control sequences, stash them as pieces
template <size_t S> void WriteI(const char (&arr)[S]) {
WritePiece(std::string_view{arr, S - 1}); // we assume null termination
}
template <typename... Args> void Write(Args&&... strs) {
(WriteI(strs), ...);
}
void FinishScope(); // Called when scope ends void FinishScope(); // Called when scope ends
char* ReservePiece(size_t size); // Reserve size bytes from buffer char* ReservePiece(size_t size); // Reserve size bytes from buffer
@ -202,19 +266,23 @@ class SinkReplyBuilder2 {
void WritePiece(std::string_view str); // Reserve + memcpy + Commit void WritePiece(std::string_view str); // Reserve + memcpy + Commit
void WriteRef(std::string_view str); // Add iovec bypassing buffer void WriteRef(std::string_view str); // Add iovec bypassing buffer
bool IsInBuf(const void* ptr) const; // checks if ptr is part of buffer_
void NextVec(std::string_view str); void NextVec(std::string_view str);
private: private:
io::Sink* sink_; io::Sink* sink_;
std::error_code ec_; std::error_code ec_;
bool scoped_; bool send_active_ = false;
bool scoped_ = false, batched_ = false;
size_t total_size_ = 0; // sum of vec_ lengths size_t total_size_ = 0; // sum of vec_ lengths
base::IoBuf buffer_; base::IoBuf buffer_; // backing buffer for pieces
std::vector<iovec> vecs_;
// Stores iovecs for a single writev call. Can reference either the buffer (WritePiece) or
// external data (WriteRef). Validity is ensured by FinishScope that either flushes before ref
// lifetime ends or copies refs to the buffer.
absl::InlinedVector<iovec, 16> vecs_;
size_t guaranteed_pieces_ = 0; // length of prefix of vecs_ that are guaranteed to be pieces
}; };
class MCReplyBuilder : public SinkReplyBuilder { class MCReplyBuilder : public SinkReplyBuilder {