From 2dfd0c07aea7221a9b817beb23fcaf82cf625da6 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sat, 2 Nov 2024 23:20:28 +0200 Subject: [PATCH] chore: add an option to start using iouring provided buffers iouring allows to register a pool of predefined buffers. then during the recv operation the kernel will choose a buffer from the pool, copy data into it and return it to the application. This is in contrast to prealocate buffers that need to be passed to a regular Recv. So, for example, if we have 10000 connections, today we preallocate 10000 buffers, even though we may have only 100 in-flight requests. This PR does not retire iobufs, it just provides basic wiring to start using provided buffers interface. Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 70 ++++++++++++++++++++++-------- src/facade/dragonfly_connection.h | 6 ++- src/facade/facade_types.h | 4 ++ src/server/dfly_main.cc | 32 +++++++++++++- 4 files changed, 91 insertions(+), 21 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 995e4b6e0..89dda70f1 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -31,6 +31,7 @@ #ifdef __linux__ #include "util/fibers/uring_file.h" +#include "util/fibers/uring_proactor.h" #endif using namespace std; @@ -87,6 +88,7 @@ ABSL_FLAG(bool, migrate_connections, true, "happen at most once per connection."); using namespace util; +using namespace std; using absl::GetFlag; using nonstd::make_unexpected; @@ -239,7 +241,7 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span resp, } // Write the data itself. - std::array blobs; + array blobs; unsigned index = 0; if (next != stack_buf) { blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)}; @@ -283,7 +285,7 @@ struct Connection::QueueBackpressure { // Used by publisher/subscriber actors to make sure we do not publish too many messages // into the queue. Thread-safe to allow safe access in EnsureBelowLimit. util::fb2::EventCount pubsub_ec; - std::atomic_size_t subscriber_bytes = 0; + atomic_size_t subscriber_bytes = 0; // Used by pipelining/execution fiber to throttle the incoming pipeline messages. // Used together with pipeline_buffer_limit to limit the pipeline usage per thread. @@ -659,6 +661,12 @@ void Connection::OnConnectionStart() { queue_backpressure_ = &tl_queue_backpressure_; stats_ = &tl_facade_stats->conn_stats; + if (socket_->proactor()->GetKind() == ProactorBase::IOURING) { +#ifdef __linux__ + auto* up = static_cast(socket_->proactor()); + recv_provided_ = up->BufRingEntrySize(kRecvSockGid) > 0; +#endif + } } void Connection::HandleRequests() { @@ -705,6 +713,7 @@ void Connection::HandleRequests() { LOG(WARNING) << "Error handshaking " << aresult.error().message(); return; } + is_tls_ = 1; VLOG(1) << "TLS handshake succeeded"; } } @@ -1243,6 +1252,43 @@ void Connection::HandleMigrateRequest() { } } +error_code Connection::HandleRecvSocket() { + io::MutableBytes append_buf = io_buf_.AppendBuffer(); + DCHECK(!append_buf.empty()); + + phase_ = READ_SOCKET; + error_code ec; + + size_t commit_sz = 0; + if (!is_tls_ && recv_provided_ && append_buf.size() >= kRecvBufSize) { + FiberSocketBase::ProvidedBuffer pb[1]; + unsigned res = socket_->RecvProvided(1, pb); + CHECK_EQ(res, 1u); + if (pb[0].buffer.empty()) { + return error_code{pb[0].err_no, system_category()}; + } + + memcpy(append_buf.data(), pb[0].buffer.data(), pb[0].buffer.size()); + commit_sz = pb[0].buffer.size(); + socket_->ReturnProvided(pb[0]); + } else { + ::io::Result recv_sz = socket_->Recv(append_buf); + last_interaction_ = time(nullptr); + + if (!recv_sz) { + return recv_sz.error(); + } + + commit_sz = *recv_sz; + } + + io_buf_.CommitWrite(commit_sz); + stats_->io_read_bytes += commit_sz; + ++stats_->io_read_cnt; + + return ec; +} + auto Connection::IoLoop() -> variant { error_code ec; ParserStatus parse_status = OK; @@ -1252,25 +1298,11 @@ auto Connection::IoLoop() -> variant { do { HandleMigrateRequest(); - - io::MutableBytes append_buf = io_buf_.AppendBuffer(); - DCHECK(!append_buf.empty()); - - phase_ = READ_SOCKET; - - ::io::Result recv_sz = peer->Recv(append_buf); - last_interaction_ = time(nullptr); - - if (!recv_sz) { - ec = recv_sz.error(); - parse_status = OK; - break; + ec = HandleRecvSocket(); + if (ec) { + return ec; } - io_buf_.CommitWrite(*recv_sz); - stats_->io_read_bytes += *recv_sz; - ++stats_->io_read_cnt; - phase_ = PROCESS; bool is_iobuf_full = io_buf_.AppendLen() == 0; diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index bc1af66c3..2435de4d4 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -354,7 +354,7 @@ class Connection : public util::Connection { void SendAsync(MessageHandle msg); - // Updates memory stats and pooling, must be called for all used messages + // Updates memory stat3s and pooling, must be called for all used messages void RecycleMessage(MessageHandle msg); // Create new pipeline request, re-use from pool when possible. @@ -372,6 +372,8 @@ class Connection : public util::Connection { PipelineMessagePtr GetFromPipelinePool(); void HandleMigrateRequest(); + std::error_code HandleRecvSocket(); + bool ShouldEndDispatchFiber(const MessageHandle& msg); void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily @@ -458,6 +460,8 @@ class Connection : public util::Connection { bool migration_enabled_ : 1; bool migration_in_process_ : 1; bool is_http_ : 1; + bool is_tls_ : 1; + bool recv_provided_ : 1; }; }; }; diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 1dbcc9087..d62ce8e64 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -185,6 +185,10 @@ extern __thread FacadeStats* tl_facade_stats; void ResetStats(); +// Constants for socket bufring. +constexpr uint16_t kRecvSockGid = 0; +constexpr size_t kRecvBufSize = 128; + } // namespace facade namespace std { diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 5caa7cdc1..7083f9181 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -23,7 +23,7 @@ #endif #ifdef __linux__ -#include +#include "util/fibers/uring_proactor.h" #endif #include @@ -81,6 +81,9 @@ ABSL_FLAG(bool, version_check, true, "If true, Will monitor for new releases on Dragonfly servers once a day."); ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter."); +ABSL_FLAG(uint16_t, recv_sock_buffers, 0, + "How many socket recv buffers to allocate per thread." + "Relevant only for modern kernels with io_uring enabled"); using namespace util; using namespace facade; @@ -600,6 +603,32 @@ void SetupAllocationTracker(ProactorPool* pool) { #endif } +void RegisterBufRings(ProactorPool* pool) { +#ifdef __linux__ + auto bufcnt = absl::GetFlag(FLAGS_recv_sock_buffers); + if (bufcnt == 0) { + return; + } + + if (dfly::kernel_version < 602 || pool->at(0)->GetKind() != ProactorBase::IOURING) { + LOG(WARNING) << "recv_sock_buffers is only supported on kernels >= 6.20 and io_uring proactor"; + return; + } + + bufcnt = absl::bit_ceil(bufcnt); + pool->AwaitBrief([&](unsigned, ProactorBase* pb) { + auto up = static_cast(pb); + int res = up->RegisterBufferRing(facade::kRecvSockGid, bufcnt, facade::kRecvBufSize); + if (res != 0) { + LOG(ERROR) << "Failed to register buf ring for proactor " + << util::detail::SafeErrorMessage(res); + exit(1); + } + }); + +#endif +} + } // namespace } // namespace dfly @@ -787,6 +816,7 @@ Usage: dragonfly [FLAGS] pool->Run(); SetupAllocationTracker(pool.get()); + RegisterBufRings(pool.get()); AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true); acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));