1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Support external storage for GET requests

This commit is contained in:
Roman Gershman 2022-04-21 23:37:46 +03:00
parent dce0ce3d69
commit 7b9bad35e9
17 changed files with 183 additions and 75 deletions

2
helio

@ -1 +1 @@
Subproject commit bb7c065c289329c301c79afb2d3a8a6410b57a28 Subproject commit 0ef31fbae12111fec2d49eab98922961ff871049

View file

@ -794,10 +794,8 @@ bool CompactObj::HasAllocated() const {
} }
void __attribute__((noinline)) CompactObj::GetString(string* res) const { void __attribute__((noinline)) CompactObj::GetString(string* res) const {
string_view slice = GetSlice(res); res->resize(Size());
if (res->data() != slice.data()) { GetString(res->data());
res->assign(slice);
}
} }
void CompactObj::GetString(char* dest) const { void CompactObj::GetString(char* dest) const {
@ -879,7 +877,7 @@ void CompactObj::SetExternal(size_t offset, size_t sz) {
u_.ext_ptr.size = sz; u_.ext_ptr.size = sz;
} }
std::pair<size_t, size_t> CompactObj::GetExternalPtr() { std::pair<size_t, size_t> CompactObj::GetExternalPtr() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_); DCHECK_EQ(EXTERNAL_TAG, taglen_);
return pair<size_t, size_t>(size_t(u_.ext_ptr.offset), size_t(u_.ext_ptr.size)); return pair<size_t, size_t>(size_t(u_.ext_ptr.offset), size_t(u_.ext_ptr.size));
} }

View file

@ -252,7 +252,7 @@ class CompactObj {
return taglen_ == EXTERNAL_TAG; return taglen_ == EXTERNAL_TAG;
} }
void SetExternal(size_t offset, size_t sz); void SetExternal(size_t offset, size_t sz);
std::pair<size_t, size_t> GetExternalPtr(); std::pair<size_t, size_t> GetExternalPtr() const;
// In case this object a single blob, returns number of bytes allocated on heap // In case this object a single blob, returns number of bytes allocated on heap
// for that blob. Otherwise returns 0. // for that blob. Otherwise returns 0.

View file

@ -123,9 +123,13 @@ struct Page {
uint16_t available; uint16_t available;
uint8_t reserved2[2]; uint8_t reserved2[2];
Page() { // We can not use c'tor because we use the trick in segment where we allocate more pages
memset(&id, 0, sizeof(Page) - offsetof(Page, id)); // than SegmentDescr declares.
void Reset(uint8_t new_id) {
static_assert(sizeof(Page) == 40); static_assert(sizeof(Page) == 40);
memset(&id, 0, sizeof(Page) - offsetof(Page, id));
id = new_id;
} }
void Init(PageClass pc, BinIdx bin_id) { void Init(PageClass pc, BinIdx bin_id) {
@ -220,8 +224,9 @@ ExternalAllocator::SegmentDescr::SegmentDescr(PageClass pc, size_t offs, uint16_
if (pc == PageClass::MEDIUM_P) if (pc == PageClass::MEDIUM_P)
page_shift_ = kMediumPageShift; page_shift_ = kMediumPageShift;
for (unsigned i = 0; i < capacity; ++i) { for (unsigned i = 0; i < capacity; ++i) {
pages_[i].id = i; pages_[i].Reset(i);
} }
} }
@ -234,6 +239,8 @@ auto ExternalAllocator::SegmentDescr::FindPageSegment() -> Page* {
} }
} }
LOG(DFATAL) << "Should not reach here";
return nullptr; return nullptr;
} }
@ -244,6 +251,12 @@ ExternalAllocator::ExternalAllocator() {
std::fill(free_pages_, free_pages_ + detail::kNumSizeBins, &empty_page); std::fill(free_pages_, free_pages_ + detail::kNumSizeBins, &empty_page);
} }
ExternalAllocator::~ExternalAllocator() {
for (auto* seg : segments_) {
mi_free(seg);
}
}
int64_t ExternalAllocator::Malloc(size_t sz) { int64_t ExternalAllocator::Malloc(size_t sz) {
uint8_t bin_idx = ToBinIdx(sz); uint8_t bin_idx = ToBinIdx(sz);
Page* page = free_pages_[bin_idx]; Page* page = free_pages_[bin_idx];
@ -352,6 +365,7 @@ auto ExternalAllocator::FindPage(PageClass pc, size_t* seg_size) -> Page* {
if (seg->HasFreePages()) { if (seg->HasFreePages()) {
return seg->FindPageSegment(); return seg->FindPageSegment();
} }
// remove head. // remove head.
SegmentDescr* next = seg->next; SegmentDescr* next = seg->next;
if (next == seg->prev) { if (next == seg->prev) {
@ -387,6 +401,7 @@ auto ExternalAllocator::FindPage(PageClass pc, size_t* seg_size) -> Page* {
sq_[pc] = seg; sq_[pc] = seg;
return seg->FindPageSegment(); return seg->FindPageSegment();
} }
*seg_size = kSegmentDefaultSize; *seg_size = kSegmentDefaultSize;
return nullptr; return nullptr;
} }
@ -421,7 +436,7 @@ void ExternalAllocator::FreePage(Page* page, SegmentDescr* owner, size_t block_s
sq->prev = owner; sq->prev = owner;
} }
} }
++owner->used_; --owner->used_;
} }
inline auto ExternalAllocator::ToSegDescr(Page* page) -> SegmentDescr* { inline auto ExternalAllocator::ToSegDescr(Page* page) -> SegmentDescr* {

View file

@ -50,6 +50,7 @@ class ExternalAllocator {
static constexpr size_t kExtAlignment = 1ULL << 28; // 256 MB static constexpr size_t kExtAlignment = 1ULL << 28; // 256 MB
ExternalAllocator(); ExternalAllocator();
~ExternalAllocator();
// If a negative result - backing storage is required of size=-result. See AddStorage // If a negative result - backing storage is required of size=-result. See AddStorage
// on how to add more storage. // on how to add more storage.

View file

@ -5,6 +5,7 @@
#include "core/external_alloc.h" #include "core/external_alloc.h"
#include "base/gtest.h" #include "base/gtest.h"
#include "base/logging.h"
namespace dfly { namespace dfly {
@ -23,6 +24,25 @@ class ExternalAllocatorTest : public ::testing::Test {
constexpr int64_t kSegSize = 1 << 28; constexpr int64_t kSegSize = 1 << 28;
std::map<int64_t, size_t> AllocateFully(ExternalAllocator* alloc) {
std::map<int64_t, size_t> ranges;
int64_t res = 0;
while (res >= 0) {
for (unsigned j = 1; j < 5; ++j) {
size_t sz = 4000 * j;
res = alloc->Malloc(sz);
if (res < 0)
break;
auto [it, added] = ranges.emplace(res, sz);
VLOG(1) << "res: " << res << " size: " << sz << " added: " << added;
CHECK(added);
}
}
return ranges;
}
TEST_F(ExternalAllocatorTest, Basic) { TEST_F(ExternalAllocatorTest, Basic) {
int64_t res = ext_alloc_.Malloc(128); int64_t res = ext_alloc_.Malloc(128);
EXPECT_EQ(-kSegSize, res); EXPECT_EQ(-kSegSize, res);
@ -32,7 +52,7 @@ TEST_F(ExternalAllocatorTest, Basic) {
EXPECT_EQ(4096, ext_alloc_.Malloc(4096)); EXPECT_EQ(4096, ext_alloc_.Malloc(4096));
EXPECT_EQ(1048576, ext_alloc_.Malloc(8192)); // another page. EXPECT_EQ(1048576, ext_alloc_.Malloc(8192)); // another page.
ext_alloc_.Free(1048576, 8192); // should return the page to the segment. ext_alloc_.Free(1048576, 8192); // should return the page to the segment.
EXPECT_EQ(1048576, ext_alloc_.Malloc(1 << 14)); // another page. EXPECT_EQ(1048576, ext_alloc_.Malloc(1 << 14)); // another page.
ext_alloc_.Free(0, 4000); ext_alloc_.Free(0, 4000);
@ -43,20 +63,7 @@ TEST_F(ExternalAllocatorTest, Basic) {
TEST_F(ExternalAllocatorTest, Invariants) { TEST_F(ExternalAllocatorTest, Invariants) {
ext_alloc_.AddStorage(0, kSegSize); ext_alloc_.AddStorage(0, kSegSize);
std::map<int64_t, size_t> ranges; auto ranges = AllocateFully(&ext_alloc_);
int64_t res = 0;
while (res >= 0) {
for (unsigned j = 1; j < 5; ++j) {
size_t sz = 4000 * j;
res = ext_alloc_.Malloc(sz);
if (res < 0)
break;
auto [it, added] = ranges.emplace(res, sz);
ASSERT_TRUE(added);
}
}
EXPECT_GT(ext_alloc_.allocated_bytes(), ext_alloc_.capacity() * 0.75); EXPECT_GT(ext_alloc_.allocated_bytes(), ext_alloc_.capacity() * 0.75);
off_t last = 0; off_t last = 0;
@ -64,6 +71,16 @@ TEST_F(ExternalAllocatorTest, Invariants) {
ASSERT_GE(k_v.first, last); ASSERT_GE(k_v.first, last);
last = k_v.first + k_v.second; last = k_v.first + k_v.second;
} }
for (const auto& k_v : ranges) {
ext_alloc_.Free(k_v.first, k_v.second);
}
EXPECT_EQ(0, ext_alloc_.allocated_bytes());
for (const auto& k_v : ranges) {
int64_t res = ext_alloc_.Malloc(k_v.second);
ASSERT_GE(res, 0);
}
} }
} // namespace dfly } // namespace dfly

View file

@ -18,6 +18,7 @@ thread_local ServerState ServerState::state_;
atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_peak(0);
atomic_uint64_t used_mem_current(0); atomic_uint64_t used_mem_current(0);
unsigned kernel_version = 0;
ServerState::ServerState() { ServerState::ServerState() {
} }

View file

@ -76,4 +76,8 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes);
extern std::atomic_uint64_t used_mem_peak; extern std::atomic_uint64_t used_mem_peak;
extern std::atomic_uint64_t used_mem_current; extern std::atomic_uint64_t used_mem_current;
// version 5.11 maps to 511 etc.
// set upon server start.
extern unsigned kernel_version;
} // namespace dfly } // namespace dfly

View file

@ -41,12 +41,12 @@ struct PopulateBatch {
}; };
void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params,
const PopulateBatch& ps) { const PopulateBatch& batch) {
SetCmd sg(&EngineShard::tlocal()->db_slice()); SetCmd sg(&EngineShard::tlocal()->db_slice());
for (unsigned i = 0; i < ps.sz; ++i) { for (unsigned i = 0; i < batch.sz; ++i) {
string key = absl::StrCat(prefix, ":", ps.index[i]); string key = absl::StrCat(prefix, ":", batch.index[i]);
string val = absl::StrCat("value:", ps.index[i]); string val = absl::StrCat("value:", batch.index[i]);
if (val.size() < val_size) { if (val.size() < val_size) {
val.resize(val_size, 'x'); val.resize(val_size, 'x');
@ -231,24 +231,24 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
for (uint64_t i = from; i < from + len; ++i) { for (uint64_t i = from; i < from + len; ++i) {
absl::StrAppend(&key, i); absl::StrAppend(&key, i);
ShardId sid = Shard(key, ess.size()); ShardId sid = Shard(key, ess.size());
key.resize(prefsize); key.resize(prefsize); // shrink back
auto& pops = ps[sid]; auto& shard_batch = ps[sid];
pops.index[pops.sz++] = i; shard_batch.index[shard_batch.sz++] = i;
if (pops.sz == 32) { if (shard_batch.sz == 32) {
ess.Add(sid, [=, p = pops] { ess.Add(sid, [=] {
DoPopulateBatch(prefix, value_len, params, p); DoPopulateBatch(prefix, value_len, params, shard_batch);
if (i % 50 == 0) { if (i % 50 == 0) {
this_fiber::yield(); this_fiber::yield();
} }
}); });
// we capture pops by value so we can override it here. // we capture shard_batch by value so we can override it here.
pops.sz = 0; shard_batch.sz = 0;
} }
} }
ess.RunBriefInParallel([&](EngineShard* shard) { ess.RunBlockingInParallel([&](EngineShard* shard) {
DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]); DoPopulateBatch(prefix, value_len, params, ps[shard->shard_id()]);
}); });
} }

View file

@ -2,11 +2,11 @@
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
// #include <mimalloc-new-delete.h>
#include <mimalloc.h> #include <mimalloc.h>
#include "base/init.h" #include "base/init.h"
#include "base/proc_util.h" #include "base/proc_util.h"
#include "facade/dragonfly_listener.h" #include "facade/dragonfly_listener.h"
#include "server/main_service.h" #include "server/main_service.h"
#include "util/accept_server.h" #include "util/accept_server.h"
@ -25,8 +25,7 @@ using namespace facade;
namespace dfly { namespace dfly {
bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) { bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
if (FLAGS_maxmemory > 0 && FLAGS_maxmemory < pool->size() * 256_MB) {
if (FLAGS_maxmemory > 0 && FLAGS_maxmemory < pool->size() * 256_MB ) {
LOG(ERROR) << "Max memory is less than 256MB per thread. Exiting..."; LOG(ERROR) << "Max memory is less than 256MB per thread. Exiting...";
return false; return false;
} }
@ -58,6 +57,7 @@ int main(int argc, char* argv[]) {
MainInitGuard guard(&argc, &argv); MainInitGuard guard(&argc, &argv);
CHECK_GT(FLAGS_port, 0u); CHECK_GT(FLAGS_port, 0u);
mi_stats_reset();
base::sys::KernelVersion kver; base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver); base::sys::GetKernelVersion(&kver);
@ -66,6 +66,8 @@ int main(int argc, char* argv[]) {
LOG(ERROR) << "Kernel 5.11 or later is supported. Exiting..."; LOG(ERROR) << "Kernel 5.11 or later is supported. Exiting...";
return 1; return 1;
} }
CHECK_LT(kver.minor, 99u);
dfly::kernel_version = kver.major * 100 + kver.minor;
if (FLAGS_use_large_pages) { if (FLAGS_use_large_pages) {
mi_option_enable(mi_option_large_os_pages); mi_option_enable(mi_option_large_os_pages);
@ -73,6 +75,7 @@ int main(int argc, char* argv[]) {
mi_option_enable(mi_option_show_errors); mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0); mi_option_set(mi_option_max_warnings, 0);
uring::UringPool pp{1024}; uring::UringPool pp{1024};
pp.Run(); pp.Run();

View file

@ -10,6 +10,8 @@
#include "facade/facade_types.h" #include "facade/facade_types.h"
#include "util/uring/proactor.h" #include "util/uring/proactor.h"
DEFINE_bool(backing_file_direct, false, "If true uses O_DIRECT to open backing files");
namespace dfly { namespace dfly {
using namespace std; using namespace std;
@ -28,7 +30,10 @@ constexpr size_t kInitialSize = 1UL << 28; // 256MB
error_code IoMgr::Open(const string& path) { error_code IoMgr::Open(const string& path) {
CHECK(!backing_file_); CHECK(!backing_file_);
const int kFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
if (FLAGS_backing_file_direct) {
kFlags |= O_DIRECT;
}
auto res = uring::OpenLinux(path, kFlags, 0666); auto res = uring::OpenLinux(path, kFlags, 0666);
if (!res) if (!res)
return res.error(); return res.error();
@ -70,6 +75,7 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) { error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
DCHECK(!blob.empty()); DCHECK(!blob.empty());
VLOG(1) << "WriteAsync " << offset << "/" << blob.size();
Proactor* proactor = (Proactor*)ProactorBase::me(); Proactor* proactor = (Proactor*)ProactorBase::me();
@ -83,6 +89,11 @@ error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
return error_code{}; return error_code{};
} }
error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
iovec v{.iov_base = dest.data(), .iov_len = dest.size()};
return backing_file_->Read(&v, 1, offset, 0);
}
void IoMgr::Shutdown() { void IoMgr::Shutdown() {
while (flags_val) { while (flags_val) {
this_fiber::sleep_for(200us); // TODO: hacky for now. this_fiber::sleep_for(200us); // TODO: hacky for now.

View file

@ -14,6 +14,7 @@ namespace dfly {
class IoMgr { class IoMgr {
public: public:
// first arg - io result. // first arg - io result.
// using WriteCb = fu2::function_base<true, false, fu2::capacity_default, false, false, void(int)>;
using WriteCb = std::function<void(int)>; using WriteCb = std::function<void(int)>;
// (io_res, ) // (io_res, )
@ -33,10 +34,15 @@ class IoMgr {
// Returns error if submission failed. Otherwise - returns the io result // Returns error if submission failed. Otherwise - returns the io result
// via cb. A caller must make sure that the blob exists until cb is called. // via cb. A caller must make sure that the blob exists until cb is called.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb); std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
std::error_code Read(size_t offset, io::MutableBytes dest);
size_t Size() const { return sz_; } size_t Size() const {
return sz_;
}
bool grow_pending() const { return flags.grow_progress;} bool grow_pending() const {
return flags.grow_progress;
}
private: private:
std::unique_ptr<util::uring::LinuxFile> backing_file_; std::unique_ptr<util::uring::LinuxFile> backing_file_;

View file

@ -469,6 +469,8 @@ tcp_port:)";
append("used_memory:", m.heap_used_bytes); append("used_memory:", m.heap_used_bytes);
append("used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes)); append("used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes));
append("used_memory_peak:", used_mem_peak.load(memory_order_relaxed));
append("comitted_memory:", _mi_stats_main.committed.current); append("comitted_memory:", _mi_stats_main.committed.current);
if (sdata_res.has_value()) { if (sdata_res.has_value()) {
@ -478,8 +480,6 @@ tcp_port:)";
LOG(ERROR) << "Error fetching /proc/self/status stats"; LOG(ERROR) << "Error fetching /proc/self/status stats";
} }
append("used_memory_peak:", used_mem_peak.load(memory_order_relaxed));
// Blob - all these cases where the key/objects are represented by a single blob allocated on // Blob - all these cases where the key/objects are represented by a single blob allocated on
// heap. For example, strings or intsets. members of lists, sets, zsets etc // heap. For example, strings or intsets. members of lists, sets, zsets etc
// are not accounted for to avoid complex computations. In some cases, when number of members // are not accounted for to avoid complex computations. In some cases, when number of members

View file

@ -190,14 +190,8 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1); std::string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> { auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING); return OpGet(OpArgs{shard, t->db_index()}, key);
if (!it_res.ok())
return it_res.status();
string val;
it_res.value()->second.GetString(&val);
return val;
}; };
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key; DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
@ -836,6 +830,30 @@ OpResult<bool> StringFamily::ExtendOrSkip(const OpArgs& op_args, std::string_vie
return new_val.size(); return new_val.size();
} }
OpResult<string> StringFamily::OpGet(const OpArgs& op_args, string_view key) {
OpResult<PrimeIterator> it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
const PrimeValue& pv = it_res.value()->second;
string val;
if (pv.IsExternal()) {
auto* tiered = op_args.shard->tiered_storage();
auto [offset, size] = pv.GetExternalPtr();
val.resize(size);
// TODO: can not work with O_DIRECT
error_code ec = tiered->Read(offset, size, val.data());
CHECK(!ec) << "TBD: " << ec;
} else {
it_res.value()->second.GetString(&val);
}
return val;
}
void StringFamily::Init(util::ProactorPool* pp) { void StringFamily::Init(util::ProactorPool* pp) {
set_qps.Init(pp); set_qps.Init(pp);
get_qps.Init(pp); get_qps.Init(pp);

View file

@ -98,6 +98,9 @@ class StringFamily {
// Returns true if was extended, false if the key was not found. // Returns true if was extended, false if the key was not found.
static OpResult<bool> ExtendOrSkip(const OpArgs& op_args, std::string_view key, static OpResult<bool> ExtendOrSkip(const OpArgs& op_args, std::string_view key,
std::string_view val, bool prepend); std::string_view val, bool prepend);
static OpResult<std::string> OpGet(const OpArgs& op_args, std::string_view key);
}; };
} // namespace dfly } // namespace dfly

View file

@ -8,6 +8,8 @@ extern "C" {
#include "redis/object.h" #include "redis/object.h"
} }
#include <mimalloc.h>
#include "base/logging.h" #include "base/logging.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "util/proactor_base.h" #include "util/proactor_base.h"
@ -19,7 +21,8 @@ struct IndexKey {
DbIndex db_indx; DbIndex db_indx;
PrimeKey key; PrimeKey key;
IndexKey() {} IndexKey() {
}
// We define here a weird copy constructor because map uses pair<const PrimeKey,..> // We define here a weird copy constructor because map uses pair<const PrimeKey,..>
// and "const" prevents moving IndexKey. // and "const" prevents moving IndexKey.
@ -28,7 +31,8 @@ struct IndexKey {
IndexKey(IndexKey&&) = default; IndexKey(IndexKey&&) = default;
IndexKey(DbIndex i, PrimeKey k) : db_indx(i), key(std::move(k)) {} IndexKey(DbIndex i, PrimeKey k) : db_indx(i), key(std::move(k)) {
}
bool operator==(const IndexKey& ik) const { bool operator==(const IndexKey& ik) const {
return ik.db_indx == db_indx && ik.key == key; return ik.db_indx == db_indx && ik.key == key;
@ -48,19 +52,34 @@ struct TieredStorage::ActiveIoRequest {
char* block_ptr; char* block_ptr;
// entry -> offset // entry -> offset
absl::flat_hash_map<IndexKey, size_t, EntryHash> entries; /*absl::flat_hash_map<IndexKey, size_t, EntryHash, std::equal_to<>,
mi_stl_allocator<std::pair<const IndexKey, size_t>>>*/
absl::flat_hash_map<IndexKey, size_t, EntryHash, std::equal_to<>> entries;
ActiveIoRequest(size_t sz) { ActiveIoRequest(size_t sz) {
DCHECK_EQ(0u, sz % 4096); DCHECK_EQ(0u, sz % 4096);
block_ptr = (char*)aligned_malloc(sz, 4096); block_ptr = (char*)mi_malloc_aligned(sz, 4096);
DCHECK_EQ(0, intptr_t(block_ptr) % 4096); DCHECK_EQ(0, intptr_t(block_ptr) % 4096);
} }
~ActiveIoRequest() { ~ActiveIoRequest() {
free(block_ptr); mi_free(block_ptr);
} }
}; };
void TieredStorage::SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req) {
#if 1
// static string tmp(4096, 'x');
// string_view sv{tmp};
string_view sv{req->block_ptr, req_size};
auto cb = [this, req](int res) { FinishIoRequest(res, req); };
io_mgr_.WriteAsync(offset, sv, move(cb));
#else
FinishIoRequest(0, req);
#endif
}
void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) { void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
bool success = true; bool success = true;
if (io_res < 0) { if (io_res < 0) {
@ -81,8 +100,10 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
it->second.SetExternal(k_v.second, item_size); it->second.SetExternal(k_v.second, item_size);
} }
} }
--num_active_requests_;
delete req; delete req;
VLOG_IF(1, num_active_requests_ == 0) << "Finished active requests";
} }
TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) { TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) {
@ -107,10 +128,12 @@ void TieredStorage::Shutdown() {
io_mgr_.Shutdown(); io_mgr_.Shutdown();
} }
void TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) { error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
CHECK_EQ(OBJ_STRING, it->second.ObjType()); CHECK_EQ(OBJ_STRING, it->second.ObjType());
size_t blob_len = it->second.Size(); size_t blob_len = it->second.Size();
error_code ec;
pending_unload_bytes_ += blob_len; pending_unload_bytes_ += blob_len;
if (db_index >= db_arr_.size()) { if (db_index >= db_arr_.size()) {
db_arr_.resize(db_index + 1); db_arr_.resize(db_index + 1);
@ -143,8 +166,10 @@ void TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
} }
}; };
io_mgr_.GrowAsync(grow_size, move(cb)); ec = io_mgr_.GrowAsync(grow_size, move(cb));
} }
return ec;
} }
size_t TieredStorage::SerializePendingItems() { size_t TieredStorage::SerializePendingItems() {
@ -201,10 +226,7 @@ size_t TieredStorage::SerializePendingItems() {
++submitted_io_writes_; ++submitted_io_writes_;
submitted_io_write_size_ += open_block_size; submitted_io_write_size_ += open_block_size;
string_view sv{active_req->block_ptr, open_block_size}; SendIoRequest(file_offset, open_block_size, active_req);
auto cb = [this, active_req](int res) { FinishIoRequest(res, active_req); };
io_mgr_.WriteAsync(file_offset, sv, move(cb));
open_block_size = 0; open_block_size = 0;
} }
@ -217,6 +239,7 @@ size_t TieredStorage::SerializePendingItems() {
file_offset = res; file_offset = res;
open_block_size = ExternalAllocator::GoodSize(item_size); open_block_size = ExternalAllocator::GoodSize(item_size);
block_offset = 0; block_offset = 0;
++num_active_requests_;
active_req = new ActiveIoRequest(open_block_size); active_req = new ActiveIoRequest(open_block_size);
} }
@ -228,20 +251,20 @@ size_t TieredStorage::SerializePendingItems() {
it->second.SetIoPending(true); it->second.SetIoPending(true);
IndexKey key(db_ind, it->first.AsRef()); IndexKey key(db_ind, it->first.AsRef());
active_req->entries.try_emplace(move(key), file_offset + block_offset); bool added = active_req->entries.emplace(move(key), file_offset + block_offset).second;
CHECK(added);
block_offset += item_size; // saved into opened block. block_offset += item_size; // saved into opened block.
pending_unload_bytes_ -= item_size; pending_unload_bytes_ -= item_size;
} }
count = 0; count = 0;
db->pending_upload.erase(cursor_val); db->pending_upload.erase(cursor_val);
} // sorted_cursors } // sorted_cursors
} // db_arr
DCHECK(db->pending_upload.empty());
} // db_arr
if (open_block_size > 0) { if (open_block_size > 0) {
auto cb = [this, active_req](int res) { FinishIoRequest(res, active_req); }; SendIoRequest(file_offset, open_block_size, active_req);
string_view sv{active_req->block_ptr, open_block_size};
io_mgr_.WriteAsync(file_offset, sv, move(cb));
} }
return 0; return 0;

View file

@ -21,8 +21,13 @@ class TieredStorage {
std::error_code Open(const std::string& path); std::error_code Open(const std::string& path);
std::error_code Read(size_t offset, size_t len, char* dest) {
return io_mgr_.Read(offset, io::MutableBytes{reinterpret_cast<uint8_t*>(dest), len});
}
std::error_code UnloadItem(DbIndex db_index, PrimeIterator it);
void Shutdown(); void Shutdown();
void UnloadItem(DbIndex db_index, PrimeIterator it);
private: private:
struct ActiveIoRequest; struct ActiveIoRequest;
@ -30,8 +35,10 @@ class TieredStorage {
// return 0 if everything was sent. // return 0 if everything was sent.
// if more storage is needed returns requested size in bytes. // if more storage is needed returns requested size in bytes.
size_t SerializePendingItems(); size_t SerializePendingItems();
void SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req);
void FinishIoRequest(int io_res, ActiveIoRequest* req); void FinishIoRequest(int io_res, ActiveIoRequest* req);
DbSlice& db_slice_; DbSlice& db_slice_;
IoMgr io_mgr_; IoMgr io_mgr_;
ExternalAllocator alloc_; ExternalAllocator alloc_;
@ -39,8 +46,9 @@ class TieredStorage {
size_t pending_unload_bytes_ = 0; size_t pending_unload_bytes_ = 0;
size_t submitted_io_writes_ = 0; size_t submitted_io_writes_ = 0;
size_t submitted_io_write_size_ = 0; size_t submitted_io_write_size_ = 0;
uint32_t num_active_requests_ = 0;
struct Hasher { struct Hasher {
size_t operator()(const PrimeKey& o) const { size_t operator()(const PrimeKey& o) const {
return o.HashCode(); return o.HashCode();
} }