1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: add an option to start using iouring provided buffers

iouring allows to register a pool of predefined buffers.
then during the recv operation the kernel will choose a buffer from the pool, copy data into it
and return it to the application. This is in contrast to prealocate buffers that need to be passed to
a regular Recv. So, for example, if we have 10000 connections, today we preallocate 10000 buffers,
even though we may have only 100 in-flight requests.

This PR does not retire iobufs, it just provides basic wiring to start using provided buffers interface.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-02 23:20:28 +02:00
parent 75c961e7ed
commit 2dfd0c07ae
No known key found for this signature in database
GPG key ID: 6568CCAB9736B618
4 changed files with 91 additions and 21 deletions

View file

@ -31,6 +31,7 @@
#ifdef __linux__
#include "util/fibers/uring_file.h"
#include "util/fibers/uring_proactor.h"
#endif
using namespace std;
@ -87,6 +88,7 @@ ABSL_FLAG(bool, migrate_connections, true,
"happen at most once per connection.");
using namespace util;
using namespace std;
using absl::GetFlag;
using nonstd::make_unexpected;
@ -239,7 +241,7 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp,
}
// Write the data itself.
std::array<iovec, 16> blobs;
array<iovec, 16> blobs;
unsigned index = 0;
if (next != stack_buf) {
blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)};
@ -283,7 +285,7 @@ struct Connection::QueueBackpressure {
// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount pubsub_ec;
std::atomic_size_t subscriber_bytes = 0;
atomic_size_t subscriber_bytes = 0;
// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
@ -659,6 +661,12 @@ void Connection::OnConnectionStart() {
queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
if (socket_->proactor()->GetKind() == ProactorBase::IOURING) {
#ifdef __linux__
auto* up = static_cast<fb2::UringProactor*>(socket_->proactor());
recv_provided_ = up->BufRingEntrySize(kRecvSockGid) > 0;
#endif
}
}
void Connection::HandleRequests() {
@ -705,6 +713,7 @@ void Connection::HandleRequests() {
LOG(WARNING) << "Error handshaking " << aresult.error().message();
return;
}
is_tls_ = 1;
VLOG(1) << "TLS handshake succeeded";
}
}
@ -1243,6 +1252,43 @@ void Connection::HandleMigrateRequest() {
}
}
error_code Connection::HandleRecvSocket() {
io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());
phase_ = READ_SOCKET;
error_code ec;
size_t commit_sz = 0;
if (!is_tls_ && recv_provided_ && append_buf.size() >= kRecvBufSize) {
FiberSocketBase::ProvidedBuffer pb[1];
unsigned res = socket_->RecvProvided(1, pb);
CHECK_EQ(res, 1u);
if (pb[0].buffer.empty()) {
return error_code{pb[0].err_no, system_category()};
}
memcpy(append_buf.data(), pb[0].buffer.data(), pb[0].buffer.size());
commit_sz = pb[0].buffer.size();
socket_->ReturnProvided(pb[0]);
} else {
::io::Result<size_t> recv_sz = socket_->Recv(append_buf);
last_interaction_ = time(nullptr);
if (!recv_sz) {
return recv_sz.error();
}
commit_sz = *recv_sz;
}
io_buf_.CommitWrite(commit_sz);
stats_->io_read_bytes += commit_sz;
++stats_->io_read_cnt;
return ec;
}
auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
error_code ec;
ParserStatus parse_status = OK;
@ -1252,25 +1298,11 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
do {
HandleMigrateRequest();
io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());
phase_ = READ_SOCKET;
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
last_interaction_ = time(nullptr);
if (!recv_sz) {
ec = recv_sz.error();
parse_status = OK;
break;
ec = HandleRecvSocket();
if (ec) {
return ec;
}
io_buf_.CommitWrite(*recv_sz);
stats_->io_read_bytes += *recv_sz;
++stats_->io_read_cnt;
phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

View file

@ -354,7 +354,7 @@ class Connection : public util::Connection {
void SendAsync(MessageHandle msg);
// Updates memory stats and pooling, must be called for all used messages
// Updates memory stat3s and pooling, must be called for all used messages
void RecycleMessage(MessageHandle msg);
// Create new pipeline request, re-use from pool when possible.
@ -372,6 +372,8 @@ class Connection : public util::Connection {
PipelineMessagePtr GetFromPipelinePool();
void HandleMigrateRequest();
std::error_code HandleRecvSocket();
bool ShouldEndDispatchFiber(const MessageHandle& msg);
void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily
@ -458,6 +460,8 @@ class Connection : public util::Connection {
bool migration_enabled_ : 1;
bool migration_in_process_ : 1;
bool is_http_ : 1;
bool is_tls_ : 1;
bool recv_provided_ : 1;
};
};
};

View file

@ -185,6 +185,10 @@ extern __thread FacadeStats* tl_facade_stats;
void ResetStats();
// Constants for socket bufring.
constexpr uint16_t kRecvSockGid = 0;
constexpr size_t kRecvBufSize = 128;
} // namespace facade
namespace std {

View file

@ -23,7 +23,7 @@
#endif
#ifdef __linux__
#include <liburing.h>
#include "util/fibers/uring_proactor.h"
#endif
#include <mimalloc.h>
@ -81,6 +81,9 @@ ABSL_FLAG(bool, version_check, true,
"If true, Will monitor for new releases on Dragonfly servers once a day.");
ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter.");
ABSL_FLAG(uint16_t, recv_sock_buffers, 0,
"How many socket recv buffers to allocate per thread."
"Relevant only for modern kernels with io_uring enabled");
using namespace util;
using namespace facade;
@ -600,6 +603,32 @@ void SetupAllocationTracker(ProactorPool* pool) {
#endif
}
void RegisterBufRings(ProactorPool* pool) {
#ifdef __linux__
auto bufcnt = absl::GetFlag(FLAGS_recv_sock_buffers);
if (bufcnt == 0) {
return;
}
if (dfly::kernel_version < 602 || pool->at(0)->GetKind() != ProactorBase::IOURING) {
LOG(WARNING) << "recv_sock_buffers is only supported on kernels >= 6.20 and io_uring proactor";
return;
}
bufcnt = absl::bit_ceil(bufcnt);
pool->AwaitBrief([&](unsigned, ProactorBase* pb) {
auto up = static_cast<fb2::UringProactor*>(pb);
int res = up->RegisterBufferRing(facade::kRecvSockGid, bufcnt, facade::kRecvBufSize);
if (res != 0) {
LOG(ERROR) << "Failed to register buf ring for proactor "
<< util::detail::SafeErrorMessage(res);
exit(1);
}
});
#endif
}
} // namespace
} // namespace dfly
@ -787,6 +816,7 @@ Usage: dragonfly [FLAGS]
pool->Run();
SetupAllocationTracker(pool.get());
RegisterBufRings(pool.get());
AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true);
acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));