1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(tiering): Remove IoMgr (#3198)

This commit is contained in:
Vladislav 2024-06-20 20:27:24 +03:00 committed by GitHub
parent e5fafdd5ad
commit 90edcee534
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 71 additions and 197 deletions

View file

@ -15,7 +15,7 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS
if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
tiering/io_mgr.cc tiering/external_alloc.cc)
tiering/external_alloc.cc)
add_executable(dfly_bench dfly_bench.cc)
cxx_link(dfly_bench dfly_facade fibers2 absl::random_random)

View file

@ -15,6 +15,8 @@
using namespace ::dfly::tiering::literals;
ABSL_FLAG(bool, backing_file_direct, false, "If true uses O_DIRECT to open backing files");
ABSL_FLAG(uint64_t, registered_buffer_size, 512_KB,
"Size of registered buffer for IoUring fixed read/writes");
@ -57,18 +59,42 @@ void ReturnBuf(UringBuf buf) {
DestroyTmpBuf(buf);
}
constexpr off_t kInitialSize = 1UL << 28; // 256MB
template <typename... Ts> std::error_code DoFiberCall(void (SubmitEntry::*c)(Ts...), Ts... args) {
auto* proactor = static_cast<UringProactor*>(ProactorBase::me());
FiberCall fc(proactor);
(fc.operator->()->*c)(std::forward<Ts>(args)...);
FiberCall::IoResult io_res = fc.Get();
return io_res < 0 ? std::error_code{-io_res, std::system_category()} : std::error_code{};
}
} // anonymous namespace
DiskStorage::DiskStorage(size_t max_size) : max_size_(max_size) {
}
std::error_code DiskStorage::Open(std::string_view path) {
RETURN_ON_ERR(io_mgr_.Open(path));
alloc_.AddStorage(0, io_mgr_.Span());
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
CHECK(!backing_file_);
int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
if (absl::GetFlag(FLAGS_backing_file_direct))
kFlags |= O_DIRECT;
auto res = OpenLinux(path, kFlags, 0666);
if (!res)
return res.error();
backing_file_ = std::move(res.value());
int fd = backing_file_->fd();
RETURN_ON_ERR(DoFiberCall(&SubmitEntry::PrepFallocate, fd, 0, 0L, kInitialSize));
RETURN_ON_ERR(DoFiberCall(&SubmitEntry::PrepFadvise, fd, 0L, 0L, POSIX_FADV_RANDOM));
size_ = kInitialSize;
alloc_.AddStorage(0, size_);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
if (int io_res = up->RegisterBuffers(absl::GetFlag(FLAGS_registered_buffer_size)); io_res < 0)
return std::error_code{-io_res, std::system_category()};
@ -77,10 +103,11 @@ std::error_code DiskStorage::Open(std::string_view path) {
void DiskStorage::Close() {
using namespace std::chrono_literals;
while (pending_ops_ > 0)
while (pending_ops_ > 0 || grow_pending_)
util::ThisFiber::SleepFor(10ms);
io_mgr_.Shutdown();
backing_file_->Close();
backing_file_.reset();
}
void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
@ -98,7 +125,10 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
};
pending_ops_++;
io_mgr_.ReadAsync(segment.offset, buf, std::move(io_cb));
if (buf.buf_idx)
backing_file_->ReadFixedAsync(buf.bytes, segment.offset, *buf.buf_idx, std::move(io_cb));
else
backing_file_->ReadAsync(buf.bytes, segment.offset, std::move(io_cb));
}
void DiskStorage::MarkAsFree(DiskSegment segment) {
@ -115,17 +145,9 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
// If we've run out of space, block and grow as much as needed
if (offset < 0) {
size_t start = io_mgr_.Span();
size_t grow_size = -offset;
RETURN_ON_ERR(Grow(-offset));
if (alloc_.capacity() + grow_size >= max_size_)
return std::make_error_code(std::errc::no_space_on_device);
RETURN_ON_ERR(io_mgr_.Grow(grow_size));
alloc_.AddStorage(start, grow_size);
offset = alloc_.Malloc(bytes.size());
if (offset < 0) // we can't fit it even after resizing
return std::make_error_code(std::errc::file_too_large);
}
@ -145,7 +167,10 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
};
pending_ops_++;
io_mgr_.WriteAsync(offset, buf, std::move(io_cb));
if (buf.buf_idx)
backing_file_->WriteFixedAsync(buf.bytes, offset, *buf.buf_idx, std::move(io_cb));
else
backing_file_->WriteAsync(buf.bytes, offset, std::move(io_cb));
return {};
}
@ -153,4 +178,22 @@ DiskStorage::Stats DiskStorage::GetStats() const {
return {alloc_.allocated_bytes(), alloc_.capacity()};
}
std::error_code DiskStorage::Grow(off_t grow_size) {
off_t start = size_;
if (off_t(alloc_.capacity()) + grow_size >= max_size_)
return std::make_error_code(std::errc::no_space_on_device);
if (std::exchange(grow_pending_, true))
return std::make_error_code(std::errc::operation_in_progress);
auto err = DoFiberCall(&SubmitEntry::PrepFallocate, backing_file_->fd(), 0, size_, grow_size);
grow_pending_ = false;
RETURN_ON_ERR(err);
size_ += grow_size;
alloc_.AddStorage(start, grow_size);
return {};
}
} // namespace dfly::tiering

View file

@ -4,13 +4,12 @@
#pragma once
#include <string>
#include <system_error>
#include "io/io.h"
#include "server/tiering/common.h"
#include "server/tiering/external_alloc.h"
#include "server/tiering/io_mgr.h"
#include "util/fibers/uring_file.h"
namespace dfly::tiering {
@ -45,9 +44,14 @@ class DiskStorage {
Stats GetStats() const;
private:
size_t pending_ops_ = 0;
size_t max_size_;
IoMgr io_mgr_;
std::error_code Grow(off_t grow_size);
private:
off_t size_, max_size_;
size_t pending_ops_ = 0; // number of ongoing ops for safe shutdown
bool grow_pending_ = false;
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
ExternalAllocator alloc_;
};

View file

@ -1,117 +0,0 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/tiering/io_mgr.h"
#include <fcntl.h>
#include <mimalloc.h>
#include "base/flags.h"
#include "base/logging.h"
#include "facade/facade_types.h"
#include "server/tiering/common.h"
#include "util/fibers/uring_proactor.h"
ABSL_FLAG(bool, backing_file_direct, false, "If true uses O_DIRECT to open backing files");
namespace dfly::tiering {
using namespace std;
using namespace util;
using namespace facade;
using Proactor = fb2::UringProactor;
using fb2::ProactorBase;
using fb2::SubmitEntry;
constexpr size_t kInitialSize = 1UL << 28; // 256MB
error_code IoMgr::Open(std::string_view path) {
CHECK(!backing_file_);
int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
if (absl::GetFlag(FLAGS_backing_file_direct)) {
kFlags |= O_DIRECT;
}
auto res = fb2::OpenLinux(path, kFlags, 0666);
if (!res)
return res.error();
backing_file_ = std::move(res.value());
Proactor* proactor = (Proactor*)ProactorBase::me();
{
fb2::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, 0, kInitialSize);
fb2::FiberCall::IoResult io_res = fc.Get();
if (io_res < 0) {
return error_code{-io_res, system_category()};
}
}
{
fb2::FiberCall fc(proactor);
fc->PrepFadvise(backing_file_->fd(), 0, 0, POSIX_FADV_RANDOM);
fb2::FiberCall::IoResult io_res = fc.Get();
if (io_res < 0) {
return error_code{-io_res, system_category()};
}
}
sz_ = kInitialSize;
return error_code{};
}
error_code IoMgr::Grow(size_t len) {
Proactor* proactor = (Proactor*)ProactorBase::me();
if (exchange(grow_progress_, true))
return make_error_code(errc::operation_in_progress);
fb2::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, sz_, len);
Proactor::IoResult res = fc.Get();
grow_progress_ = false;
if (res == 0) {
sz_ += len;
return {};
} else {
return std::error_code(-res, std::iostream_category());
}
}
void IoMgr::WriteAsync(size_t offset, util::fb2::UringBuf buf, WriteCb cb) {
DCHECK(!buf.bytes.empty());
auto* proactor = static_cast<Proactor*>(ProactorBase::me());
auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };
SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
if (buf.buf_idx)
se.PrepWriteFixed(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset,
*buf.buf_idx);
else
se.PrepWrite(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset);
}
void IoMgr::ReadAsync(size_t offset, util::fb2::UringBuf buf, ReadCb cb) {
DCHECK(!buf.bytes.empty());
auto* proactor = static_cast<Proactor*>(ProactorBase::me());
auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };
SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
if (buf.buf_idx)
se.PrepReadFixed(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset, *buf.buf_idx);
else
se.PrepRead(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset);
}
void IoMgr::Shutdown() {
while (grow_progress_) {
ThisFiber::SleepFor(200us); // TODO: hacky for now.
}
backing_file_->Close();
backing_file_.reset();
}
} // namespace dfly::tiering

View file

@ -1,57 +0,0 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <functional>
#include <string>
#include "server/common.h"
#include "util/fibers/uring_file.h"
#include "util/fibers/uring_proactor.h"
namespace dfly::tiering {
class IoMgr {
public:
// first arg - io result.
// using WriteCb = fu2::function_base<true, false, fu2::capacity_default, false, false,
// void(int)>;
using WriteCb = std::function<void(int)>;
using ReadCb = std::function<void(int)>;
// blocks until all the pending requests are finished.
void Shutdown();
std::error_code Open(std::string_view path);
// Try growing file by that length. Return error if growth failed.
std::error_code Grow(size_t len);
// Write into offset from src and call cb once done. The callback is guaranteed to be invoked in
// any error case for cleanup. The src buffer must outlive the call, until cb is resolved.
void WriteAsync(size_t offset, util::fb2::UringBuf src, WriteCb cb);
// Read into dest and call cb once read. The callback is guaranteed to be invoked in any error
// case for cleanup. The dest buffer must outlive the call, until cb is resolved.
void ReadAsync(size_t offset, util::fb2::UringBuf dest, ReadCb cb);
// Total file span
size_t Span() const {
return sz_;
}
bool grow_pending() const {
return grow_progress_;
}
private:
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
size_t sz_ = 0;
bool grow_progress_ = false;
};
} // namespace dfly::tiering

View file

@ -11,6 +11,7 @@
#include "io/io.h"
#include "server/tiering/common.h"
#include "server/tiering/disk_storage.h"
#include "util/fibers/fibers.h"
namespace dfly::tiering {
namespace {