From 432ece3ace3f1f90963b290730d15b9310247fb4 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Fri, 28 Apr 2023 14:02:55 +0300 Subject: [PATCH] Dispatch queue backpressure and batching (#1118) Adds backpressure to pubsub Signed-off-by: Vladislav Oleshko --- src/facade/dragonfly_connection.cc | 38 ++++++++++++++++++++++-------- src/facade/dragonfly_connection.h | 11 ++++++--- src/server/main_service.cc | 9 +++++++ 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 9f238dd7e..f29dbf62a 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -78,6 +78,8 @@ bool MatchHttp11Line(string_view line) { constexpr size_t kMinReadSize = 256; constexpr size_t kMaxReadSize = 32_KB; +constexpr size_t kMaxDispatchQMemory = 5_MB; + thread_local uint32_t free_req_release_weight = 0; } // namespace @@ -173,11 +175,18 @@ template struct Overloaded : Ts... { template Overloaded(Ts...) -> Overloaded; -size_t Connection::MessageHandle::StorageCapacity() const { - auto pub_size = [](const PubMessage& msg) -> size_t { return 0; }; - auto msg_size = [](const PipelineMessage& arg) -> size_t { return arg.StorageCapacity(); }; - auto monitor_size = [](const MonitorMessage& arg) -> size_t { return 0; }; - return visit(Overloaded{pub_size, msg_size, monitor_size}, this->handle); +size_t Connection::MessageHandle::UsedMemory() const { + // TODO: don't count inline size + auto pub_size = [](const PubMessage& msg) -> size_t { + const auto* md = get_if(&msg.data); + return sizeof(PubMessage) + (md ? (md->channel_len + md->message_len) : 0u); + }; + auto msg_size = [](const PipelineMessage& arg) -> size_t { + return sizeof(PipelineMessage) + arg.args.capacity() * sizeof(MutableSlice) + + arg.storage.capacity(); + }; + auto monitor_size = [](const MonitorMessage& arg) -> size_t { return arg.capacity(); }; + return sizeof(MessageHandle) + visit(Overloaded{pub_size, msg_size, monitor_size}, this->handle); } bool Connection::MessageHandle::IsPipelineMsg() const { @@ -218,12 +227,9 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) { ++stats->pipelined_cmd_cnt; - self->pipeline_msg_cnt_--; - bool do_batch = (self->pipeline_msg_cnt_ > 0); - DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()) << " " << do_batch; + DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()); - builder->SetBatchMode(do_batch); self->cc_->async_dispatch = true; self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); self->last_interaction_ = time(nullptr); @@ -568,7 +574,6 @@ auto Connection::ParseRedis() -> ParserStatus { } else { // Dispatch via queue to speedup input reading. SendAsync(MessageHandle{FromArgs(move(tmp_parse_args_), tlh)}); - ++pipeline_msg_cnt_; if (dispatch_q_.size() > 10) ThisFiber::Yield(); } @@ -740,8 +745,14 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { MessageHandle msg = move(dispatch_q_.front()); dispatch_q_.pop_front(); + + builder->SetBatchMode(dispatch_q_.size() > 0); + std::visit(dispatch_op, msg.handle); + dispatch_q_bytes_.fetch_sub(msg.UsedMemory(), memory_order_relaxed); + evc_bp_.notify(); + if (auto* pipe = get_if(&msg.handle); pipe) { if (stats_->pipeline_cache_capacity < request_cache_limit) { stats_->pipeline_cache_capacity += (*pipe)->StorageCapacity(); @@ -842,12 +853,19 @@ void Connection::SendAsync(MessageHandle msg) { if (cc_->conn_closing) return; + dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed); + dispatch_q_.push_back(move(msg)); if (dispatch_q_.size() == 1) { evc_.notify(); } } +void Connection::EnsureAsyncMemoryBudget() { + evc_bp_.await( + [this] { return dispatch_q_bytes_.load(memory_order_relaxed) <= kMaxDispatchQMemory; }); +} + std::string Connection::RemoteEndpointStr() const { LinuxSocketBase* lsb = static_cast(socket_.get()); bool unix_socket = lsb->IsUDS(); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 4427ec99c..e03868848 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -116,7 +116,7 @@ class Connection : public util::Connection { using PubMessagePtr = std::unique_ptr; struct MessageHandle { - size_t StorageCapacity() const; + size_t UsedMemory() const; // How much bytes this handle takes up in total. bool IsPipelineMsg() const; @@ -133,6 +133,10 @@ class Connection : public util::Connection { // Add monitor message to dispatch queue. void SendMonitorMessageAsync(std::string); + // Must be called before Send_Async to ensure the connection dispatch queue is not overfilled. + // Blocks until free space is available. + void EnsureAsyncMemoryBudget(); + // Register hook that is executed on connection shutdown. ShutdownHandle RegisterShutdownHook(ShutdownCb cb); @@ -212,6 +216,9 @@ class Connection : public util::Connection { std::deque dispatch_q_; // dispatch queue dfly::EventCount evc_; // dispatch queue waker + std::atomic_uint64_t dispatch_q_bytes_ = 0; // memory usage of all entries + dfly::EventCount evc_bp_; // backpressure for memory limit + base::IoBuf io_buf_; // used in io loop and parsers std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; @@ -233,8 +240,6 @@ class Connection : public util::Connection { std::unique_ptr cc_; unsigned parser_error_ = 0; - uint32_t pipeline_msg_cnt_ = 0; - uint32_t break_poll_id_ = UINT32_MAX; BreakerCb breaker_cb_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 402874fe0..a0d5f0bef 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1470,6 +1470,15 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { int num_published = subscribers.size(); if (!subscribers.empty()) { + // Make sure neither of the subscribers buffers is filled up. + // This check actually doesn't reserve any memory ahead and doesn't prevent the buffer + // from eventually filling up, especially if multiple clients are unblocked simultaneously + // but is generally good enough to limit too fast producers. + // Most importantly, this approach allows not blocking and not awaiting in the dispatch below, + // thus not adding any overhead to backpressure checks. + for (auto& sub : subscribers) + sub.conn_cntx->owner()->EnsureAsyncMemoryBudget(); + auto subscribers_ptr = make_shared(move(subscribers)); auto buf = shared_ptr{new char[channel.size() + msg.size()]}; memcpy(buf.get(), channel.data(), channel.size());