1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore: small fixes around tiering (#3368)

There are no changes in functionality here.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-23 16:00:50 +03:00 committed by GitHub
parent cd1f9d3923
commit c8a98fd110
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 44 additions and 28 deletions

View file

@ -950,21 +950,23 @@ void CompactObj::SetExternal(size_t offset, uint32_t sz) {
u_.ext_ptr.offload.page_index = offset / 4096;
}
void CompactObj::SetCold(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
SetMeta(EXTERNAL_TAG, mask_);
void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
// We copy the mask of the "cooled" referenced object because it contains the encoding info.
SetMeta(EXTERNAL_TAG, record->value.mask_);
u_.ext_ptr.is_cool = 1;
u_.ext_ptr.page_offset = offset % 4096;
u_.ext_ptr.serialized_size = sz;
u_.ext_ptr.cold_record = record;
u_.ext_ptr.cool_record = record;
}
auto CompactObj::GetCold() const -> ColdItem {
auto CompactObj::GetCool() const -> CoolItem {
DCHECK(IsExternal() && u_.ext_ptr.is_cool);
ColdItem res;
CoolItem res;
res.page_offset = u_.ext_ptr.page_offset;
res.serialized_size = u_.ext_ptr.serialized_size;
res.record = u_.ext_ptr.cold_record;
res.record = u_.ext_ptr.cool_record;
return res;
}
@ -976,9 +978,9 @@ void CompactObj::ImportExternal(const CompactObj& src) {
std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_);
DCHECK_EQ(u_.ext_ptr.is_cool, 0);
size_t offset = size_t(u_.ext_ptr.offload.page_index) * 4096 + u_.ext_ptr.page_offset;
auto& ext = u_.ext_ptr;
size_t offset = ext.page_offset;
offset += size_t(ext.is_cool ? ext.cool_record->page_index : ext.offload.page_index) * 4096;
return pair<size_t, size_t>(offset, size_t(u_.ext_ptr.serialized_size));
}

View file

@ -331,14 +331,14 @@ class CompactObj {
}
void SetExternal(size_t offset, uint32_t sz);
void SetCold(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
struct ColdItem {
struct CoolItem {
uint16_t page_offset;
size_t serialized_size;
detail::TieredColdRecord* record;
};
ColdItem GetCold() const;
CoolItem GetCool() const;
void ImportExternal(const CompactObj& src);
@ -433,6 +433,8 @@ class CompactObj {
uint16_t is_cool : 1;
uint16_t is_reserved : 15;
// We do not have enough space in the common area to store page_index together with
// cool_record pointer. Therefore, we moved this field into TieredColdRecord itself.
struct Offload {
uint32_t page_index;
uint32_t reserved;
@ -440,7 +442,7 @@ class CompactObj {
union {
Offload offload;
detail::TieredColdRecord* cold_record;
detail::TieredColdRecord* cool_record;
};
} __attribute__((packed));

View file

@ -27,7 +27,9 @@ class CoolQueue {
CompactObj Erase(detail::TieredColdRecord* record);
size_t UsedMemory() const;
size_t UsedMemory() const {
return used_memory_;
}
private:
detail::TieredColdRecord* head_ = nullptr;

View file

@ -257,7 +257,7 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 120);
static_assert(sizeof(TieredStats) == 128);
ADD(total_stashes);
ADD(total_fetches);
@ -278,6 +278,7 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
ADD(small_bins_entries_cnt);
ADD(small_bins_filling_bytes);
ADD(total_stash_overflows);
ADD(cold_storage_bytes);
return *this;
}

View file

@ -82,6 +82,7 @@ struct TieredStats {
uint64_t small_bins_cnt = 0;
uint64_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;
size_t cold_storage_bytes = 0;
TieredStats& operator+=(const TieredStats&);
};

View file

@ -419,7 +419,7 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
});
}
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetPoolIndex();
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
@ -447,7 +447,7 @@ void EngineShard::InitTieredStorage(ProactorBase* pb, size_t max_file_size) {
// TODO: enable tiered storage on non-default namespace
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
auto* shard = EngineShard::tlocal();
shard->tiered_storage_ = make_unique<TieredStorage>(&db_slice, max_file_size);
shard->tiered_storage_ = make_unique<TieredStorage>(max_file_size, &db_slice);
error_code ec = shard->tiered_storage_->Open(backing_prefix);
CHECK(!ec) << ec.message();
}
@ -888,7 +888,7 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
size_t max_shard_file_size = GetTieredFileLimit(sz);
pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_queue_.size()) {
InitThreadLocal(pb, update_db_time, max_shard_file_size);
InitThreadLocal(pb, update_db_time);
}
});
@ -909,8 +909,8 @@ void EngineShardSet::Shutdown() {
RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
EngineShard::InitThreadLocal(pb, update_db_time, max_file_size);
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
EngineShard::InitThreadLocal(pb, update_db_time);
EngineShard* es = EngineShard::tlocal();
shard_queue_[es->shard_id()] = es->GetFiberQueue();
}

View file

@ -48,7 +48,7 @@ class EngineShard {
// Sets up a new EngineShard in the thread.
// If update_db_time is true, initializes periodic time update for its db_slice.
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
// Must be called after all InitThreadLocal() have finished
void InitTieredStorage(util::ProactorBase* pb, size_t max_file_size);
@ -342,7 +342,7 @@ class EngineShardSet {
void TEST_EnableCacheMode();
private:
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
util::ProactorPool* pp_;
std::vector<TaskQueue*> shard_queue_;

View file

@ -2190,6 +2190,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_small_bins_cnt", m.tiered_stats.small_bins_cnt);
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
append("tiered_cold_storage_bytes", m.tiered_stats.cold_storage_bytes);
}
if (should_enter("PERSISTENCE", true)) {

View file

@ -17,6 +17,7 @@
#include "base/logging.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
@ -26,8 +27,8 @@
using namespace facade;
ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 1_MB,
"In bytes. If memory budget on a shard goes blow this limit, tiering stops "
ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
"hot-loading values into ram.");
ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
@ -250,7 +251,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
return false;
}
TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_size)
TieredStorage::TieredStorage(size_t max_size, DbSlice* db_slice)
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
bins_{make_unique<tiering::SmallBins>()} {
write_depth_limit_ = absl::GetFlag(FLAGS_tiered_storage_write_depth);
@ -306,6 +307,8 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,
std::function<T(std::string*)> modf) {
DCHECK(value.IsExternal());
DCHECK(!value.IsCool()); // TBD
util::fb2::Future<T> future;
PrimeValue decoder;
decoder.ImportExternal(value);
@ -419,6 +422,7 @@ TieredStats TieredStorage::GetStats() const {
{ // Own stats
stats.total_stash_overflows = stats_.stash_overflow_cnt;
stats.cold_storage_bytes = cool_queue_.UsedMemory();
}
return stats;
}

View file

@ -13,6 +13,7 @@
#include <absl/container/flat_hash_map.h>
#include "core/cool_queue.h"
#include "server/common.h"
#include "server/table.h"
@ -34,7 +35,7 @@ class TieredStorage {
// Min sizes of values taking up full page on their own
const static size_t kMinOccupancySize = tiering::kPageSize / 2;
explicit TieredStorage(DbSlice* db_slice, size_t max_size);
explicit TieredStorage(size_t max_file_size, DbSlice* db_slice);
~TieredStorage(); // drop forward declared unique_ptrs
TieredStorage(TieredStorage&& other) = delete;
@ -79,11 +80,12 @@ class TieredStorage {
// Returns if a value should be stashed
bool ShouldStash(const PrimeValue& pv) const;
private:
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
std::unique_ptr<ShardOpManager> op_manager_;
std::unique_ptr<tiering::SmallBins> bins_;
CoolQueue cool_queue_;
unsigned write_depth_limit_ = 10;
struct {
uint64_t stash_overflow_cnt = 0;

View file

@ -31,7 +31,8 @@ class OpManager {
using KeyRef = std::pair<DbIndex, std::string_view>;
// Two separate keyspaces are provided - one for strings, one for numeric identifiers.
// Ids can be used to track auxiliary values that don't map to real keys (like packed pages).
// Ids can be used to track auxiliary values that don't map to real keys (like a page index).
// Specifically, we track page indexes when serializing small-bin pages with multiple items.
using EntryId = std::variant<unsigned, KeyRef>;
using OwnedEntryId = std::variant<unsigned, std::pair<DbIndex, std::string>>;