1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore: introduce a cool queue that gradually retires cool items (#3377)

* chore: introduce a cool queue that gradually retires cool items

This PR introduces a new state in which the offloaded value is not freed from memory but instead stays
in the cool queue.

Upon Read we convert the cool value back to hot table and delete it from storage.
When we low on memory we retire oldest cool values until we are above the threshold.

The PR does not fully finish the feature but it is workable enough to start (load)testing.
Missing:
a) Handle Modify operations
b) Retire cool items in more cases where we are low on memory. Specifically, refrain from evictions as long as cool items exist.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-25 09:09:40 +03:00 committed by GitHub
parent 02b72c9042
commit 8a9c9adbc5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 220 additions and 43 deletions

View file

@ -387,12 +387,9 @@ EngineShard::~EngineShard() {
}
void EngineShard::Shutdown() {
queue_.Shutdown();
DVLOG(1) << "EngineShard::Shutdown";
if (tiered_storage_) {
tiered_storage_->Close();
tiered_storage_.reset();
}
queue_.Shutdown();
DCHECK(!fiber_periodic_.IsJoinable());
@ -894,7 +891,14 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
}
void EngineShardSet::PreShutdown() {
RunBlockingInParallel([](EngineShard* shard) { shard->StopPeriodicFiber(); });
RunBlockingInParallel([](EngineShard* shard) {
shard->StopPeriodicFiber();
// We must close tiered_storage before we destroy namespaces that own db slices.
if (shard->tiered_storage()) {
shard->tiered_storage()->Close();
}
});
}
void EngineShardSet::Shutdown() {

View file

@ -31,8 +31,15 @@ ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
"hot-loading values into ram.");
ABSL_FLAG(bool, tiered_experimental_cooling, false,
"If true, uses intermidate cooling layer "
"when offloading values to storage");
ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
"Maximum number of concurrent stash requests issued by background offload");
ABSL_FLAG(float, tiered_low_memory_factor, 0.1,
"Determines the low limit per shard that "
"tiered storage should not cross");
namespace dfly {
@ -57,13 +64,6 @@ void RecordDeleted(const PrimeValue& pv, size_t tiered_len, DbTableStats* stats)
stats->tiered_used_bytes -= tiered_len;
}
// Called before overriding value with segment
void RecordAdded(const PrimeValue& pv, size_t tiered_len, DbTableStats* stats) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += tiered_len;
}
string DecodeString(bool is_raw, string_view str, PrimeValue decoder) {
if (is_raw) {
decoder.Materialize(str, true);
@ -74,6 +74,13 @@ string DecodeString(bool is_raw, string_view str, PrimeValue decoder) {
return string{str};
}
tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
tiering::DiskSegment res;
res.length = item.serialized_size;
res.offset = size_t(item.record->page_index) * 4096 + item.page_offset;
return res;
}
} // anonymous namespace
class TieredStorage::ShardOpManager : public tiering::OpManager {
@ -103,6 +110,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return db_slice_.MutableStats(dbid);
}
void DeleteOffloaded(DbIndex dbid, const tiering::DiskSegment& segment);
private:
PrimeValue* Find(OpManager::KeyRef key) {
// TODO: Get DbContext for transaction for correct dbid and time
@ -128,6 +137,10 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
bool NotifyDelete(tiering::DiskSegment segment) override;
// If we are low on memory, remove entries from the ColdQueue,
// and promote their PrimeValues to be fully external.
void RetireColdEntries(size_t additional_memory);
// Set value to be an in-memory type again. Update memory stats.
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
DCHECK(!value.empty());
@ -139,13 +152,30 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Find entry by key in db_slice and store external segment in place of original value.
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
RecordAdded(*pv, segment.length, GetDbTableStats(key.first));
// TODO: to rename it to CoolEntry or something.
if (auto* pv = Find(key); pv) {
auto* stats = GetDbTableStats(key.first);
pv->SetStashPending(false);
pv->SetExternal(segment.offset, segment.length);
stats->tiered_entries++;
stats->tiered_used_bytes += segment.length;
stats_.total_stashes++;
if (absl::GetFlag(FLAGS_tiered_experimental_cooling)) {
RetireColdEntries(pv->MallocUsed());
detail::TieredColdRecord* record = ts_->cool_queue_.PushFront(
key.first, CompactObj::HashCode(key.second), segment.offset / 4096, std::move(*pv));
DCHECK(record);
pv->SetCool(segment.offset, segment.length, record);
DCHECK_EQ(pv->Size(), record->value.Size());
} else {
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
pv->SetExternal(segment.offset, segment.length);
}
} else {
LOG(DFATAL) << "Should not reach here";
}
}
@ -159,7 +189,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
}
int64_t memory_margin_ = 0;
int64_t memory_margin_ = 0; // TODO: get rid of memory_margin_
size_t memory_low_limit_;
struct {
uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
@ -184,10 +215,22 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
continue;
stats_.total_defrags++;
PrimeValue& pv = it->second;
if (pv.IsCool()) {
PrimeValue::CoolItem item = pv.GetCool();
tiering::DiskSegment segment = FromCoolItem(item);
// Cut out relevant part of value and restore it to memory
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
Upload(dbid, value, true, item_segment.length, &it->second);
// We remove it from both cool storage and the offline storage.
PrimeValue hot = ts_->cool_queue_.Erase(item.record);
pv = std::move(hot);
auto* stats = GetDbTableStats(dbid);
stats->tiered_entries--;
stats->tiered_used_bytes -= segment.length;
} else {
// Cut out relevant part of value and restore it to memory
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
Upload(dbid, value, true, item_segment.length, &pv);
}
}
}
@ -240,7 +283,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
if (bin.fragmented) {
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset;
DVLOG(2) << "Enqueueing bin defragmentation for: " << bin.segment.offset;
auto cb = [dummy = 5](bool, std::string*) -> bool {
(void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
return false;
@ -251,10 +294,66 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
return false;
}
void TieredStorage::ShardOpManager::RetireColdEntries(size_t additional_memory) {
ssize_t threshold = memory_low_limit_ + additional_memory;
// we trigger if we below the low point
if (db_slice_.memory_budget() < threshold) {
// But once we below we try to raise above high watermark.
const double kHighFactor = 1.0;
size_t needed_to_free =
(threshold - db_slice_.memory_budget()) + memory_low_limit_ * kHighFactor;
size_t gained = 0;
do {
size_t memory_before = ts_->cool_queue_.UsedMemory();
detail::TieredColdRecord* record = ts_->cool_queue_.PopBack();
if (record == nullptr) // nothing to pull anymore
break;
gained += memory_before - ts_->cool_queue_.UsedMemory();
// Find the entry that points to the cool item and externalize it.
auto predicate = [record](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && probe.IsCool() && probe.GetCool().record == record;
};
PrimeIterator it =
db_slice_.GetDBTable(record->db_index)->prime.FindFirst(record->key_hash, predicate);
CHECK(IsValid(it));
PrimeValue& pv = it->second;
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
// Now the item is only in storage.
pv.SetExternal(segment.offset, segment.length);
auto* stats = GetDbTableStats(record->db_index);
stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed());
CompactObj::DeleteMR<detail::TieredColdRecord>(record);
} while (gained < needed_to_free);
VLOG(1) << "Memory budget: " << db_slice_.memory_budget() << ", gained " << gained;
// Update memory_budget directly since we know that gained bytes were released.
// We will overwrite the budget correctly in the next Hearbeat.
db_slice_.SetCachedParams(gained + db_slice_.memory_budget(), db_slice_.bytes_per_object());
}
}
void TieredStorage::ShardOpManager::DeleteOffloaded(DbIndex dbid,
const tiering::DiskSegment& segment) {
auto* stats = GetDbTableStats(dbid);
OpManager::DeleteOffloaded(segment);
stats->tiered_used_bytes -= segment.length;
stats->tiered_entries--;
}
TieredStorage::TieredStorage(size_t max_size, DbSlice* db_slice)
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
bins_{make_unique<tiering::SmallBins>()} {
write_depth_limit_ = absl::GetFlag(FLAGS_tiered_storage_write_depth);
size_t mem_per_shard = max_memory_limit / shard_set->size();
SetMemoryLowLimit(absl::GetFlag(FLAGS_tiered_low_memory_factor) * mem_per_shard);
}
TieredStorage::~TieredStorage() {
@ -268,38 +367,60 @@ void TieredStorage::Close() {
op_manager_->Close();
}
void TieredStorage::SetMemoryLowLimit(size_t mem_limit) {
op_manager_->memory_low_limit_ = mem_limit;
VLOG(1) << "Memory low limit is " << mem_limit;
}
util::fb2::Future<string> TieredStorage::Read(DbIndex dbid, string_view key,
const PrimeValue& value) {
DCHECK(value.IsExternal());
util::fb2::Future<string> future;
if (value.IsCool()) {
// If we read a cool record - bring it back to primary table.
PrimeValue hot = Warmup(dbid, value.GetCool());
string tmp;
// The raw_val passed to cb might need decoding based on the encoding mask of the "value" object.
// We save the mask in decoder and use it to decode the final string that Read should resolve.
PrimeValue decoder;
decoder.ImportExternal(value);
DCHECK_EQ(value.Size(), hot.Size());
hot.GetString(&tmp);
future.Resolve(tmp);
auto cb = [future, decoder = std::move(decoder)](bool is_raw, const string* raw_val) mutable {
future.Resolve(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false; // was not modified
};
// TODO: An awful hack - to fix later.
const_cast<PrimeValue&>(value) = std::move(hot);
} else {
// The raw_val passed to cb might need decoding based on the encoding mask of the "value"
// object. We save the mask in decoder and use it to decode the final string that Read should
// resolve.
PrimeValue decoder;
decoder.ImportExternal(value);
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
auto cb = [future, decoder = std::move(decoder)](bool is_raw, const string* raw_val) mutable {
future.Resolve(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false; // was not modified
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
}
return future;
}
void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<void(const std::string&)> readf) {
DCHECK(value.IsExternal());
if (value.IsCool()) {
util::fb2::Future<string> res = Read(dbid, key, value);
readf(res.Get());
} else {
PrimeValue decoder;
decoder.ImportExternal(value);
PrimeValue decoder;
decoder.ImportExternal(value);
auto cb = [readf = std::move(readf), decoder = std::move(decoder)](
bool is_raw, const string* raw_val) mutable {
readf(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
auto cb = [readf = std::move(readf), decoder = std::move(decoder)](
bool is_raw, const string* raw_val) mutable {
readf(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
}
}
template <typename T>
@ -371,10 +492,22 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
++stats_.total_deletes;
tiering::DiskSegment segment = value->GetExternalSlice();
op_manager_->DeleteOffloaded(segment);
tiering::DiskSegment segment;
if (value->IsCool()) {
// Delete the cool item.
PrimeValue::CoolItem item = value->GetCool();
segment.length = item.serialized_size;
segment.offset = item.page_offset + item.record->page_index * 4096;
PrimeValue hot = cool_queue_.Erase(item.record);
DCHECK_EQ(OBJ_STRING, hot.ObjType());
} else {
segment = value->GetExternalSlice();
}
// In any case we delete the offloaded segment and reset the value.
value->Reset();
RecordDeleted(*value, segment.length, op_manager_->GetDbTableStats(dbid));
stats_.total_deletes++;
op_manager_->DeleteOffloaded(dbid, segment);
}
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
@ -470,4 +603,18 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
}
PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
tiering::DiskSegment segment = FromCoolItem(item);
// We remove it from both cool storage and the offline storage.
PrimeValue hot = cool_queue_.Erase(item.record);
op_manager_->DeleteOffloaded(dbid, segment);
// Bring it back to the PrimeTable.
DCHECK(hot.ObjType() == OBJ_STRING);
hot.SetTouched(true);
return hot;
}
} // namespace dfly

View file

@ -44,6 +44,8 @@ class TieredStorage {
std::error_code Open(std::string_view path);
void Close();
void SetMemoryLowLimit(size_t mem_limit);
// Read offloaded value. It must be of external type
util::fb2::Future<std::string> Read(DbIndex dbid, std::string_view key, const PrimeValue& value);
@ -80,6 +82,9 @@ class TieredStorage {
// Returns if a value should be stashed
bool ShouldStash(const PrimeValue& pv) const;
// Returns the primary value, and deletes the cool item as well as its offloaded storage.
PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item);
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
std::unique_ptr<ShardOpManager> op_manager_;

View file

@ -20,6 +20,7 @@
using namespace std;
using namespace testing;
using namespace util;
ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
@ -286,4 +287,22 @@ TEST_F(TieredStorageTest, FlushPending) {
EXPECT_EQ(GetMetrics().tiered_stats.small_bins_filling_bytes, 0u);
}
TEST_F(TieredStorageTest, MemoryPressure) {
max_memory_limit = 20_MB;
pp_->at(0)->AwaitBrief([] {
EngineShard::tlocal()->TEST_EnableHeartbeat();
EngineShard::tlocal()->tiered_storage()->SetMemoryLowLimit(2_MB);
});
constexpr size_t kNum = 10000;
for (size_t i = 0; i < kNum; i++) {
auto resp = Run({"SET", absl::StrCat("k", i), BuildString(10000)});
ASSERT_EQ(resp, "OK") << i;
// TODO: to remove it once used_mem_current is updated frequently.
ThisFiber::SleepFor(10us);
}
EXPECT_LT(used_mem_peak.load(), 20_MB);
}
} // namespace dfly

View file

@ -34,6 +34,8 @@ OpManager::OpManager(size_t max_size) : storage_{max_size} {
}
OpManager::~OpManager() {
DCHECK(pending_stash_ver_.empty());
DCHECK(pending_reads_.empty());
}
std::error_code OpManager::Open(std::string_view file) {