diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 20e41f42d..b5af249b8 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -124,7 +124,10 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { ".back"); shard_->io_mgr_.reset(new IoMgr); error_code ec = shard_->io_mgr_->Open(fn); - CHECK(!ec) << ec.message(); // TODO + CHECK(!ec) << ec.message(); // TODO + if (shard_->io_mgr_->Size()) { // Add initial storage. + shard_->ext_alloc_.AddStorage(0, shard_->io_mgr_->Size()); + } } } @@ -541,6 +544,48 @@ size_t EngineShard::UsedMemory() const { return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); } +void EngineShard::AddItemToUnload(string_view blob) { + DCHECK(io_mgr_); + + size_t grow_size = 0; + int64_t res = ext_alloc_.Malloc(blob.size()); + if (res >= 0) { + auto cb = [](int res) {}; + io_mgr_->WriteAsync(res, blob, cb); + } else { + grow_size = -res; + } + + if (grow_size == 0 && ext_alloc_.allocated_bytes() > size_t(ext_alloc_.capacity() * 0.85)) { + grow_size = 1ULL << 28; + } + + if (grow_size && !io_mgr_->grow_pending()) { + size_t start = io_mgr_->Size(); + + auto cb = [start, grow_size, this](int io_res) { + if (io_res == 0) { + ext_alloc_.AddStorage(start, grow_size); + } else { + LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res; + } + }; + io_mgr_->GrowAsync(grow_size, move(cb)); + } +} + +/** + + + _____ _ ____ _ _ ____ _ + | ____| _ __ __ _ (_) _ __ ___ / ___| | |__ __ _ _ __ __| |/ ___| ___ | |_ + | _| | '_ \ / _` || || '_ \ / _ \\___ \ | '_ \ / _` || '__|/ _` |\___ \ / _ \| __| + | |___ | | | || (_| || || | | || __/ ___) || | | || (_| || | | (_| | ___) || __/| |_ + |_____||_| |_| \__, ||_||_| |_| \___||____/ |_| |_| \__,_||_| \__,_||____/ \___| \__| + |___/ + + */ + void EngineShardSet::Init(uint32_t sz) { CHECK_EQ(0u, size()); cached_stats.resize(sz); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 9e2cdf651..9d8084539 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -137,6 +137,8 @@ class EngineShard { return io_mgr_.get(); } + void AddItemToUnload(std::string_view blob); + // for everyone to use for string transformations during atomic cpu sequences. sds tmp_str1, tmp_str2; diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index 5886b5785..f602ae389 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -23,6 +23,8 @@ IoMgr::IoMgr() { flags_val = 0; } +constexpr size_t kInitialSize = 1UL << 28; // 256MB + error_code IoMgr::Open(const string& path) { CHECK(!backing_file_); @@ -30,7 +32,15 @@ error_code IoMgr::Open(const string& path) { if (!res) return res.error(); backing_file_ = move(res.value()); + Proactor* proactor = (Proactor*)ProactorBase::me(); + uring::FiberCall fc(proactor); + fc->PrepFallocate(backing_file_->fd(), 0, 0, kInitialSize); + FiberCall::IoResult io_res = fc.Get(); + if (io_res < 0) { + return error_code{-io_res, system_category()}; + } + sz_ = kInitialSize; return error_code{}; } @@ -44,11 +54,12 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) { Proactor* proactor = (Proactor*)ProactorBase::me(); uring::SubmitEntry entry = proactor->GetSubmitEntry( - [this, cb = move(cb)](Proactor::IoResult res, uint32_t , int64_t arg) { + [this, cb = move(cb)](Proactor::IoResult res, uint32_t, int64_t arg) { this->flags.grow_progress = 0; sz_ += (res == 0 ? arg : 0); cb(res); - }, len); + }, + len); entry.PrepFallocate(backing_file_->fd(), 0, sz_, len); flags.grow_progress = 1; @@ -56,16 +67,23 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) { return error_code{}; } -error_code IoMgr::GetBlockAsync(string_view buf, int64_t arg, CbType cb) { - /*uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me(); +error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) { + DCHECK(!blob.empty()); - auto mgr_cb = [cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags, int64_t payload) { - cb(res, 4096); + uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me(); + + uint8_t* ptr = new uint8_t[blob.size()]; + memcpy(ptr, blob.data(), blob.size()); + + auto ring_cb = [ptr, cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags, + int64_t payload) { + cb(res); + delete[] ptr; }; - uring::SubmitEntry se = proactor->GetSubmitEntry(move(mgr_cb), 0); - se.PrepWrite(backing_file_->fd(), str_.data(), str_.size(), 4096); -*/ + uring::SubmitEntry se = proactor->GetSubmitEntry(move(ring_cb), 0); + se.PrepWrite(backing_file_->fd(), ptr, blob.size(), offset); + return error_code{}; } diff --git a/src/server/io_mgr.h b/src/server/io_mgr.h index 6f2c59adc..fac34db4d 100644 --- a/src/server/io_mgr.h +++ b/src/server/io_mgr.h @@ -14,8 +14,7 @@ namespace dfly { class IoMgr { public: // first arg - io result. - // second arg - an offset to the buffer in the backing file. - using CbType = std::function; + using WriteCb = std::function; // (io_res, ) using GrowCb = std::function; @@ -35,12 +34,14 @@ class IoMgr { return backing_file_->Write(io::Buffer(blob), offset, 0); } - // Returns error if submission failed. Otherwise - returns the error code - // via cb. if no error is returned - buf must live until cb is called. - std::error_code GetBlockAsync(std::string_view buf, int64_t arg, CbType cb); + // Returns error if submission failed. Otherwise - returns the io result + // via cb. + std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb); size_t Size() const { return sz_; } + bool grow_pending() const { return flags.grow_progress;} + private: std::unique_ptr backing_file_; size_t sz_ = 0; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 8ecaeac95..d682e1e54 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -98,19 +98,8 @@ OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::s IoMgr* io_mgr = shard->io_mgr(); if (io_mgr) { // external storage enabled. - ExternalAllocator* ext_alloc = shard->external_allocator(); - int64_t res = ext_alloc->Malloc(value.size()); - if (res < 0) { - size_t start = io_mgr->Size(); - io_mgr->GrowAsync(-res, [start, len = -res, ext_alloc](int io_res) { - if (io_res == 0) { - ext_alloc->AddStorage(start, len); - } else { - LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res; - } - }); - } else { - io_mgr->Write(res, value); + if (value.size() >= 64) { + shard->AddItemToUnload(value); } }