1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Dispatch queue backpressure and batching (#1118)

Adds backpressure to pubsub

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-04-28 14:02:55 +03:00 committed by GitHub
parent 574ae24fec
commit 432ece3ace
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 13 deletions

View file

@ -78,6 +78,8 @@ bool MatchHttp11Line(string_view line) {
constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxReadSize = 32_KB;
constexpr size_t kMaxDispatchQMemory = 5_MB;
thread_local uint32_t free_req_release_weight = 0;
} // namespace
@ -173,11 +175,18 @@ template <class... Ts> struct Overloaded : Ts... {
template <class... Ts> Overloaded(Ts...) -> Overloaded<Ts...>;
size_t Connection::MessageHandle::StorageCapacity() const {
auto pub_size = [](const PubMessage& msg) -> size_t { return 0; };
auto msg_size = [](const PipelineMessage& arg) -> size_t { return arg.StorageCapacity(); };
auto monitor_size = [](const MonitorMessage& arg) -> size_t { return 0; };
return visit(Overloaded{pub_size, msg_size, monitor_size}, this->handle);
size_t Connection::MessageHandle::UsedMemory() const {
// TODO: don't count inline size
auto pub_size = [](const PubMessage& msg) -> size_t {
const auto* md = get_if<PubMessage::MessageData>(&msg.data);
return sizeof(PubMessage) + (md ? (md->channel_len + md->message_len) : 0u);
};
auto msg_size = [](const PipelineMessage& arg) -> size_t {
return sizeof(PipelineMessage) + arg.args.capacity() * sizeof(MutableSlice) +
arg.storage.capacity();
};
auto monitor_size = [](const MonitorMessage& arg) -> size_t { return arg.capacity(); };
return sizeof(MessageHandle) + visit(Overloaded{pub_size, msg_size, monitor_size}, this->handle);
}
bool Connection::MessageHandle::IsPipelineMsg() const {
@ -218,12 +227,9 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) {
++stats->pipelined_cmd_cnt;
self->pipeline_msg_cnt_--;
bool do_batch = (self->pipeline_msg_cnt_ > 0);
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()) << " " << do_batch;
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
builder->SetBatchMode(do_batch);
self->cc_->async_dispatch = true;
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
@ -568,7 +574,6 @@ auto Connection::ParseRedis() -> ParserStatus {
} else {
// Dispatch via queue to speedup input reading.
SendAsync(MessageHandle{FromArgs(move(tmp_parse_args_), tlh)});
++pipeline_msg_cnt_;
if (dispatch_q_.size() > 10)
ThisFiber::Yield();
}
@ -740,8 +745,14 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
MessageHandle msg = move(dispatch_q_.front());
dispatch_q_.pop_front();
builder->SetBatchMode(dispatch_q_.size() > 0);
std::visit(dispatch_op, msg.handle);
dispatch_q_bytes_.fetch_sub(msg.UsedMemory(), memory_order_relaxed);
evc_bp_.notify();
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
if (stats_->pipeline_cache_capacity < request_cache_limit) {
stats_->pipeline_cache_capacity += (*pipe)->StorageCapacity();
@ -842,12 +853,19 @@ void Connection::SendAsync(MessageHandle msg) {
if (cc_->conn_closing)
return;
dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed);
dispatch_q_.push_back(move(msg));
if (dispatch_q_.size() == 1) {
evc_.notify();
}
}
void Connection::EnsureAsyncMemoryBudget() {
evc_bp_.await(
[this] { return dispatch_q_bytes_.load(memory_order_relaxed) <= kMaxDispatchQMemory; });
}
std::string Connection::RemoteEndpointStr() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
bool unix_socket = lsb->IsUDS();

View file

@ -116,7 +116,7 @@ class Connection : public util::Connection {
using PubMessagePtr = std::unique_ptr<PubMessage, MessageDeleter>;
struct MessageHandle {
size_t StorageCapacity() const;
size_t UsedMemory() const; // How much bytes this handle takes up in total.
bool IsPipelineMsg() const;
@ -133,6 +133,10 @@ class Connection : public util::Connection {
// Add monitor message to dispatch queue.
void SendMonitorMessageAsync(std::string);
// Must be called before Send_Async to ensure the connection dispatch queue is not overfilled.
// Blocks until free space is available.
void EnsureAsyncMemoryBudget();
// Register hook that is executed on connection shutdown.
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
@ -212,6 +216,9 @@ class Connection : public util::Connection {
std::deque<MessageHandle> dispatch_q_; // dispatch queue
dfly::EventCount evc_; // dispatch queue waker
std::atomic_uint64_t dispatch_q_bytes_ = 0; // memory usage of all entries
dfly::EventCount evc_bp_; // backpressure for memory limit
base::IoBuf io_buf_; // used in io loop and parsers
std::unique_ptr<RedisParser> redis_parser_;
std::unique_ptr<MemcacheParser> memcache_parser_;
@ -233,8 +240,6 @@ class Connection : public util::Connection {
std::unique_ptr<ConnectionContext> cc_;
unsigned parser_error_ = 0;
uint32_t pipeline_msg_cnt_ = 0;
uint32_t break_poll_id_ = UINT32_MAX;
BreakerCb breaker_cb_;

View file

@ -1470,6 +1470,15 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
int num_published = subscribers.size();
if (!subscribers.empty()) {
// Make sure neither of the subscribers buffers is filled up.
// This check actually doesn't reserve any memory ahead and doesn't prevent the buffer
// from eventually filling up, especially if multiple clients are unblocked simultaneously
// but is generally good enough to limit too fast producers.
// Most importantly, this approach allows not blocking and not awaiting in the dispatch below,
// thus not adding any overhead to backpressure checks.
for (auto& sub : subscribers)
sub.conn_cntx->owner()->EnsureAsyncMemoryBudget();
auto subscribers_ptr = make_shared<decltype(subscribers)>(move(subscribers));
auto buf = shared_ptr<char[]>{new char[channel.size() + msg.size()]};
memcpy(buf.get(), channel.data(), channel.size());