mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(tiering): simple offload loop (#2987)
Simple offloading for tiering Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
8697b20a9e
commit
f27506e678
12 changed files with 144 additions and 19 deletions
|
@ -120,6 +120,9 @@ class CompactObj {
|
|||
// while ASCII2_ENC_BIT rounds it up. See DecodedLen implementation for more info.
|
||||
ASCII1_ENC_BIT = 8,
|
||||
ASCII2_ENC_BIT = 0x10,
|
||||
|
||||
// IO_PENDING is set when the tiered storage has issued an i/o request to save the value. It is
|
||||
// cleared when the io request finishes or is cancelled.
|
||||
IO_PENDING = 0x20,
|
||||
STICKY = 0x40,
|
||||
|
||||
|
|
|
@ -701,6 +701,14 @@ class DashCursor {
|
|||
return val_ != 0;
|
||||
}
|
||||
|
||||
bool operator==(const DashCursor& other) const {
|
||||
return val_ == other.val_;
|
||||
}
|
||||
|
||||
bool operator!=(const DashCursor& other) const {
|
||||
return !(val_ == other.val_);
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t val_;
|
||||
};
|
||||
|
|
|
@ -258,11 +258,12 @@ bool ParseDouble(string_view src, double* value) {
|
|||
#define ADD(x) (x) += o.x
|
||||
|
||||
TieredStats& TieredStats::operator+=(const TieredStats& o) {
|
||||
static_assert(sizeof(TieredStats) == 80);
|
||||
static_assert(sizeof(TieredStats) == 88);
|
||||
|
||||
ADD(total_stashes);
|
||||
ADD(total_fetches);
|
||||
ADD(total_cancels);
|
||||
ADD(total_deletes);
|
||||
|
||||
ADD(allocated_bytes);
|
||||
ADD(capacity_bytes);
|
||||
|
|
|
@ -63,6 +63,7 @@ struct TieredStats {
|
|||
size_t total_stashes = 0;
|
||||
size_t total_fetches = 0;
|
||||
size_t total_cancels = 0;
|
||||
size_t total_deletes = 0;
|
||||
|
||||
size_t allocated_bytes = 0;
|
||||
size_t capacity_bytes = 0;
|
||||
|
|
|
@ -51,7 +51,7 @@ ABSL_FLAG(uint32_t, hz, 100,
|
|||
ABSL_FLAG(bool, cache_mode, false,
|
||||
"If true, the backend behaves like a cache, "
|
||||
"by evicting entries when getting close to maxmemory limit");
|
||||
// memory defragmented related flags
|
||||
|
||||
ABSL_FLAG(float, mem_defrag_threshold, 0.7,
|
||||
"Minimum percentage of used memory relative to maxmemory cap before running "
|
||||
"defragmentation");
|
||||
|
@ -582,6 +582,8 @@ void EngineShard::Heartbeat() {
|
|||
}
|
||||
|
||||
ssize_t eviction_redline = (max_memory_limit * kRedLimitFactor) / shard_set->size();
|
||||
size_t tiering_redline =
|
||||
(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size();
|
||||
|
||||
DbContext db_cntx;
|
||||
db_cntx.time_now_ms = GetCurrentTimeMs();
|
||||
|
@ -603,6 +605,10 @@ void EngineShard::Heartbeat() {
|
|||
if (db_slice_.memory_budget() < eviction_redline) {
|
||||
db_slice_.FreeMemWithEvictionStep(i, eviction_redline - db_slice_.memory_budget());
|
||||
}
|
||||
|
||||
if (tiered_storage_ && UsedMemory() > tiering_redline) {
|
||||
tiered_storage_->RunOffloading(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Journal entries for expired entries are not writen to socket in the loop above.
|
||||
|
|
|
@ -2083,9 +2083,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (should_enter("TIERED", true)) {
|
||||
append("tiered_entries", total.tiered_entries);
|
||||
append("tiered_entries_bytes", total.tiered_used_bytes);
|
||||
|
||||
append("tiered_total_stashes", m.tiered_stats.total_stashes);
|
||||
append("tiered_total_fetches", m.tiered_stats.total_fetches);
|
||||
append("tiered_total_cancels", m.tiered_stats.total_cancels);
|
||||
append("tiered_total_deletes", m.tiered_stats.total_deletes);
|
||||
|
||||
append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes);
|
||||
append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes);
|
||||
|
|
|
@ -39,7 +39,7 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
|
|||
ADD(listpack_blob_cnt);
|
||||
ADD(listpack_bytes);
|
||||
ADD(tiered_entries);
|
||||
ADD(tiered_size);
|
||||
ADD(tiered_used_bytes);
|
||||
|
||||
for (size_t i = 0; i < o.memory_usage_by_type.size(); ++i) {
|
||||
memory_usage_by_type[i] += o.memory_usage_by_type[i];
|
||||
|
|
|
@ -68,7 +68,7 @@ struct DbTableStats {
|
|||
size_t listpack_blob_cnt = 0;
|
||||
size_t listpack_bytes = 0;
|
||||
size_t tiered_entries = 0;
|
||||
size_t tiered_size = 0;
|
||||
size_t tiered_used_bytes = 0;
|
||||
|
||||
std::array<size_t, OBJ_TYPE_MAX> memory_usage_by_type = {};
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to
|
|||
namespace dfly {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
|
||||
os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_size
|
||||
os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_used_bytes
|
||||
<< ", tiered_entries: " << stats.tiered_entries << "\n";
|
||||
|
||||
return os;
|
||||
|
@ -658,7 +658,6 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& c
|
|||
break;
|
||||
}
|
||||
ThisFiber::SleepFor(5ms);
|
||||
// absl::SleepFor(absl::Milliseconds(10)); ??
|
||||
}
|
||||
|
||||
EXPECT_LE(absl::Now(), deadline)
|
||||
|
|
|
@ -22,10 +22,14 @@
|
|||
#include "server/tiering/common.h"
|
||||
#include "server/tiering/op_manager.h"
|
||||
#include "server/tiering/small_bins.h"
|
||||
#include "server/tx_base.h"
|
||||
|
||||
ABSL_FLAG(bool, tiered_storage_cache_fetched, true,
|
||||
"WIP: Load results of offloaded reads to memory");
|
||||
|
||||
ABSL_FLAG(size_t, tiered_storage_write_depth, 50,
|
||||
"Maximum number of concurrent stash requests issued by background offload");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
|
@ -52,11 +56,28 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched);
|
||||
}
|
||||
|
||||
// Find entry by key in db_slice and store external segment in place of original value
|
||||
// Called before overriding value with segment
|
||||
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
|
||||
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
|
||||
stats->tiered_entries++;
|
||||
stats->tiered_used_bytes += segment.length;
|
||||
}
|
||||
|
||||
// Called after setting new value in place of previous segment
|
||||
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
|
||||
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
|
||||
stats->tiered_entries--;
|
||||
stats->tiered_used_bytes -= segment.length;
|
||||
}
|
||||
|
||||
// Find entry by key in db_slice and store external segment in place of original value.
|
||||
// Update memory stats
|
||||
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
|
||||
if (auto pv = Find(key); pv) {
|
||||
RecordAdded(db_slice_->MutableStats(0), *pv, segment);
|
||||
|
||||
pv->SetIoPending(false);
|
||||
pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats
|
||||
pv->SetExternal(segment.offset, segment.length);
|
||||
|
||||
stats_.total_stashes++;
|
||||
}
|
||||
|
@ -82,14 +103,22 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
ClearIoPending(key);
|
||||
}
|
||||
|
||||
// Set value to be an in-memory type again, either empty or with a value. Update memory stats
|
||||
void SetInMemory(PrimeValue* pv, string_view value, tiering::DiskSegment segment) {
|
||||
pv->Reset();
|
||||
if (!value.empty())
|
||||
pv->SetString(value);
|
||||
|
||||
RecordDeleted(db_slice_->MutableStats(0), *pv, segment);
|
||||
|
||||
(value.empty() ? stats_.total_deletes : stats_.total_fetches)++;
|
||||
}
|
||||
|
||||
// Find entry by key and store it's up-to-date value in place of external segment.
|
||||
// Returns false if the value is outdated, true otherwise
|
||||
bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) {
|
||||
if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
|
||||
pv->Reset(); // TODO: account for memory
|
||||
pv->SetString(value);
|
||||
|
||||
stats_.total_fetches++;
|
||||
SetInMemory(pv, value, segment);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -136,7 +165,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
bool cache_fetched_ = false;
|
||||
|
||||
struct {
|
||||
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0;
|
||||
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0;
|
||||
} stats_;
|
||||
|
||||
TieredStorage* ts_;
|
||||
|
@ -219,7 +248,7 @@ void TieredStorage::Delete(PrimeValue* value) {
|
|||
} else if (auto bin = bins_->Delete(segment); bin) {
|
||||
op_manager_->Delete(*bin);
|
||||
}
|
||||
value->Reset();
|
||||
op_manager_->SetInMemory(value, "", segment);
|
||||
}
|
||||
|
||||
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
|
||||
|
@ -232,7 +261,7 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue*
|
|||
value->SetIoPending(false);
|
||||
}
|
||||
|
||||
bool TieredStorage::ShouldStash(const PrimeValue& pv) {
|
||||
bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
|
||||
return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize;
|
||||
}
|
||||
|
||||
|
@ -264,4 +293,32 @@ TieredStats TieredStorage::GetStats() const {
|
|||
return stats;
|
||||
}
|
||||
|
||||
void TieredStorage::RunOffloading(DbIndex dbid) {
|
||||
PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
|
||||
int stash_limit =
|
||||
absl::GetFlag(FLAGS_tiered_storage_write_depth) - op_manager_->GetStats().pending_stash_cnt;
|
||||
if (stash_limit <= 0)
|
||||
return;
|
||||
|
||||
std::string tmp;
|
||||
auto cb = [this, dbid, &tmp, &stash_limit](PrimeIterator it) {
|
||||
if (it->second.HasIoPending() || it->second.IsExternal())
|
||||
return;
|
||||
|
||||
if (ShouldStash(it->second)) {
|
||||
Stash(dbid, it->first.GetSlice(&tmp), &it->second);
|
||||
stash_limit--;
|
||||
}
|
||||
};
|
||||
|
||||
PrimeTable::Cursor start_cursor{};
|
||||
|
||||
// Loop while we haven't traversed all entries or reached our stash io device limit.
|
||||
// Keep number of iterations below resonable limit to keep datastore always responsive
|
||||
size_t iterations = 0;
|
||||
do {
|
||||
offloading_cursor_ = table.TraverseBySegmentOrder(offloading_cursor_, cb);
|
||||
} while (offloading_cursor_ != start_cursor && stash_limit > 0 && iterations++ < 100);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <utility>
|
||||
|
||||
#include "server/tiering/common.h"
|
||||
#include "server/tx_base.h"
|
||||
#include "util/fibers/future.h"
|
||||
#ifdef __linux__
|
||||
|
||||
|
@ -60,11 +61,16 @@ class TieredStorage {
|
|||
void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value);
|
||||
|
||||
// Returns if a value should be stashed
|
||||
bool ShouldStash(const PrimeValue& pv);
|
||||
bool ShouldStash(const PrimeValue& pv) const;
|
||||
|
||||
TieredStats GetStats() const;
|
||||
|
||||
// Run offloading loop until i/o device is loaded or all entries were traversed
|
||||
void RunOffloading(DbIndex dbid);
|
||||
|
||||
private:
|
||||
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
|
||||
|
||||
std::unique_ptr<ShardOpManager> op_manager_;
|
||||
std::unique_ptr<tiering::SmallBins> bins_;
|
||||
};
|
||||
|
@ -127,6 +133,9 @@ class TieredStorage {
|
|||
TieredStats GetStats() const {
|
||||
return {};
|
||||
}
|
||||
|
||||
void RunOffloading() {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
|
||||
#include "absl/flags/internal/flag.h"
|
||||
#include "absl/flags/reflection.h"
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_test.h"
|
||||
|
@ -19,13 +20,12 @@
|
|||
|
||||
using namespace std;
|
||||
using namespace testing;
|
||||
using absl::SetFlag;
|
||||
using absl::StrCat;
|
||||
|
||||
ABSL_DECLARE_FLAG(bool, force_epoll);
|
||||
ABSL_DECLARE_FLAG(string, tiered_prefix);
|
||||
ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched);
|
||||
ABSL_DECLARE_FLAG(bool, backing_file_direct);
|
||||
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -59,13 +59,18 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
|
|||
Run({"SET", absl::StrCat("k", i), string(i, 'A')});
|
||||
}
|
||||
|
||||
// Make sure all entries were stashed, except the one few not filling a small page
|
||||
// Make sure all entries were stashed, except the one not filling a small page
|
||||
size_t stashes = 0;
|
||||
ExpectConditionWithinTimeout([this, &stashes] {
|
||||
stashes = GetMetrics().tiered_stats.total_stashes;
|
||||
return stashes >= kMax - 256 - 1;
|
||||
});
|
||||
|
||||
// All entries were accounted for except that one (see comment above)
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_EQ(metrics.db_stats[0].tiered_entries, kMax - kMin - 1);
|
||||
EXPECT_EQ(metrics.db_stats[0].tiered_used_bytes, (kMax - 1 + kMin) * (kMax - kMin) / 2 - 2047);
|
||||
|
||||
// Perform GETSETs
|
||||
for (size_t i = kMin; i < kMax; i++) {
|
||||
auto resp = Run({"GETSET", absl::StrCat("k", i), string(i, 'B')});
|
||||
|
@ -105,4 +110,36 @@ TEST_F(TieredStorageTest, MultiDb) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, BackgroundOffloading) {
|
||||
absl::FlagSaver saver;
|
||||
absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
|
||||
|
||||
const int kNum = 500;
|
||||
|
||||
max_memory_limit = kNum * 4096;
|
||||
pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); });
|
||||
|
||||
// Stash all values
|
||||
for (size_t i = 0; i < kNum; i++) {
|
||||
Run({"SET", absl::StrCat("k", i), string(3000, 'A')});
|
||||
}
|
||||
|
||||
ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });
|
||||
ASSERT_EQ(GetMetrics().tiered_stats.total_stashes, kNum);
|
||||
ASSERT_EQ(GetMetrics().db_stats[0].tiered_entries, kNum);
|
||||
|
||||
// Trigger re-fetch
|
||||
for (size_t i = 0; i < kNum; i++) {
|
||||
Run({"GET", absl::StrCat("k", i)});
|
||||
}
|
||||
|
||||
// Wait for offload to do it all again
|
||||
ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });
|
||||
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_EQ(metrics.tiered_stats.total_stashes, 2 * kNum);
|
||||
EXPECT_EQ(metrics.tiered_stats.total_fetches, kNum);
|
||||
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in a new issue