1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Organize code and make the write asynchronous

This commit is contained in:
Roman Gershman 2022-04-14 21:31:31 +03:00
parent bc92ace19c
commit e5a3a83bae
5 changed files with 83 additions and 28 deletions

View file

@ -124,7 +124,10 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
".back");
shard_->io_mgr_.reset(new IoMgr);
error_code ec = shard_->io_mgr_->Open(fn);
CHECK(!ec) << ec.message(); // TODO
CHECK(!ec) << ec.message(); // TODO
if (shard_->io_mgr_->Size()) { // Add initial storage.
shard_->ext_alloc_.AddStorage(0, shard_->io_mgr_->Size());
}
}
}
@ -541,6 +544,48 @@ size_t EngineShard::UsedMemory() const {
return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal();
}
void EngineShard::AddItemToUnload(string_view blob) {
DCHECK(io_mgr_);
size_t grow_size = 0;
int64_t res = ext_alloc_.Malloc(blob.size());
if (res >= 0) {
auto cb = [](int res) {};
io_mgr_->WriteAsync(res, blob, cb);
} else {
grow_size = -res;
}
if (grow_size == 0 && ext_alloc_.allocated_bytes() > size_t(ext_alloc_.capacity() * 0.85)) {
grow_size = 1ULL << 28;
}
if (grow_size && !io_mgr_->grow_pending()) {
size_t start = io_mgr_->Size();
auto cb = [start, grow_size, this](int io_res) {
if (io_res == 0) {
ext_alloc_.AddStorage(start, grow_size);
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}
};
io_mgr_->GrowAsync(grow_size, move(cb));
}
}
/**
_____ _ ____ _ _ ____ _
| ____| _ __ __ _ (_) _ __ ___ / ___| | |__ __ _ _ __ __| |/ ___| ___ | |_
| _| | '_ \ / _` || || '_ \ / _ \\___ \ | '_ \ / _` || '__|/ _` |\___ \ / _ \| __|
| |___ | | | || (_| || || | | || __/ ___) || | | || (_| || | | (_| | ___) || __/| |_
|_____||_| |_| \__, ||_||_| |_| \___||____/ |_| |_| \__,_||_| \__,_||____/ \___| \__|
|___/
*/
void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size());
cached_stats.resize(sz);

View file

@ -137,6 +137,8 @@ class EngineShard {
return io_mgr_.get();
}
void AddItemToUnload(std::string_view blob);
// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1, tmp_str2;

View file

@ -23,6 +23,8 @@ IoMgr::IoMgr() {
flags_val = 0;
}
constexpr size_t kInitialSize = 1UL << 28; // 256MB
error_code IoMgr::Open(const string& path) {
CHECK(!backing_file_);
@ -30,7 +32,15 @@ error_code IoMgr::Open(const string& path) {
if (!res)
return res.error();
backing_file_ = move(res.value());
Proactor* proactor = (Proactor*)ProactorBase::me();
uring::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, 0, kInitialSize);
FiberCall::IoResult io_res = fc.Get();
if (io_res < 0) {
return error_code{-io_res, system_category()};
}
sz_ = kInitialSize;
return error_code{};
}
@ -44,11 +54,12 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
Proactor* proactor = (Proactor*)ProactorBase::me();
uring::SubmitEntry entry = proactor->GetSubmitEntry(
[this, cb = move(cb)](Proactor::IoResult res, uint32_t , int64_t arg) {
[this, cb = move(cb)](Proactor::IoResult res, uint32_t, int64_t arg) {
this->flags.grow_progress = 0;
sz_ += (res == 0 ? arg : 0);
cb(res);
}, len);
},
len);
entry.PrepFallocate(backing_file_->fd(), 0, sz_, len);
flags.grow_progress = 1;
@ -56,16 +67,23 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
return error_code{};
}
error_code IoMgr::GetBlockAsync(string_view buf, int64_t arg, CbType cb) {
/*uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
DCHECK(!blob.empty());
auto mgr_cb = [cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags, int64_t payload) {
cb(res, 4096);
uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
uint8_t* ptr = new uint8_t[blob.size()];
memcpy(ptr, blob.data(), blob.size());
auto ring_cb = [ptr, cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags,
int64_t payload) {
cb(res);
delete[] ptr;
};
uring::SubmitEntry se = proactor->GetSubmitEntry(move(mgr_cb), 0);
se.PrepWrite(backing_file_->fd(), str_.data(), str_.size(), 4096);
*/
uring::SubmitEntry se = proactor->GetSubmitEntry(move(ring_cb), 0);
se.PrepWrite(backing_file_->fd(), ptr, blob.size(), offset);
return error_code{};
}

View file

@ -14,8 +14,7 @@ namespace dfly {
class IoMgr {
public:
// first arg - io result.
// second arg - an offset to the buffer in the backing file.
using CbType = std::function<void(int, uint64_t)>;
using WriteCb = std::function<void(int)>;
// (io_res, )
using GrowCb = std::function<void(int)>;
@ -35,12 +34,14 @@ class IoMgr {
return backing_file_->Write(io::Buffer(blob), offset, 0);
}
// Returns error if submission failed. Otherwise - returns the error code
// via cb. if no error is returned - buf must live until cb is called.
std::error_code GetBlockAsync(std::string_view buf, int64_t arg, CbType cb);
// Returns error if submission failed. Otherwise - returns the io result
// via cb.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
size_t Size() const { return sz_; }
bool grow_pending() const { return flags.grow_progress;}
private:
std::unique_ptr<util::uring::LinuxFile> backing_file_;
size_t sz_ = 0;

View file

@ -98,19 +98,8 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
IoMgr* io_mgr = shard->io_mgr();
if (io_mgr) { // external storage enabled.
ExternalAllocator* ext_alloc = shard->external_allocator();
int64_t res = ext_alloc->Malloc(value.size());
if (res < 0) {
size_t start = io_mgr->Size();
io_mgr->GrowAsync(-res, [start, len = -res, ext_alloc](int io_res) {
if (io_res == 0) {
ext_alloc->AddStorage(start, len);
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}
});
} else {
io_mgr->Write(res, value);
if (value.size() >= 64) {
shard->AddItemToUnload(value);
}
}