mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: refactoring around tiered storage (#3299)
* chore: refactoring around tiered storage 1. Renamed ReportXXX callbacks to NotifyXXX 2. Pulled RecordDelete/RecordAdded out of TieredStorage::ShardOpManager. 3. Moved TieredStorage::ShardOpManager functions to to private scope. 4. Streamlined code in TieredStorage::Delete Signed-off-by: Roman Gershman <roman@dragonflydb.io> * fix: Preserve expiry upon uploading external values --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1daa80117c
commit
f9ded47c3d
3 changed files with 47 additions and 42 deletions
|
@ -1423,13 +1423,10 @@ void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbT
|
|||
});
|
||||
} while (cursor);
|
||||
|
||||
// Wait for delete operations to finish in sync.
|
||||
// TODO: the logic inside tiered_storage that updates tiered_entries is somewhat fragile.
|
||||
// To revisit it, otherwise we may have deadlocks around this code.
|
||||
while (db_ptr->stats.tiered_entries > 0) {
|
||||
LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
|
||||
ThisFiber::SleepFor(1ms);
|
||||
}
|
||||
// While tiered_storage may delete some of its entries asynchronously, it updates
|
||||
// stats.tiered_entries immediately during the Delete call, therefore tiered_entries
|
||||
// should be zero by this point.
|
||||
CHECK_EQ(db_ptr->stats.tiered_entries, 0u);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -88,23 +88,16 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
ClearIoPending(key);
|
||||
}
|
||||
|
||||
// Set value to be an in-memory type again, either empty or with a value. Update memory stats
|
||||
void SetInMemory(PrimeValue* pv, DbIndex dbid, string_view value, tiering::DiskSegment segment) {
|
||||
pv->Reset();
|
||||
if (!value.empty())
|
||||
pv->SetString(value);
|
||||
|
||||
RecordDeleted(*pv, segment.length, db_slice_->MutableStats(dbid));
|
||||
DbTableStats* GetDbTableStats(DbIndex dbid) {
|
||||
return db_slice_->MutableStats(dbid);
|
||||
}
|
||||
|
||||
// Find entry by key and store it's up-to-date value in place of external segment.
|
||||
// Returns false if the value is outdated, true otherwise
|
||||
bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) {
|
||||
if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
|
||||
SetInMemory(pv, key.first, value, segment);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
private:
|
||||
PrimeValue* Find(OpManager::KeyRef key) {
|
||||
// TODO: Get DbContext for transaction for correct dbid and time
|
||||
// Bypass all update and stat mechanisms
|
||||
auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second);
|
||||
return IsValid(it) ? &it->second : nullptr;
|
||||
}
|
||||
|
||||
// Load all values from bin by their hashes
|
||||
|
@ -124,19 +117,17 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
|
||||
bool NotifyDelete(tiering::DiskSegment segment) override;
|
||||
|
||||
private:
|
||||
PrimeValue* Find(OpManager::KeyRef key) {
|
||||
// TODO: Get DbContext for transaction for correct dbid and time
|
||||
// Bypass all update and stat mechanisms
|
||||
auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second);
|
||||
return IsValid(it) ? &it->second : nullptr;
|
||||
// Set value to be an in-memory type again, either empty or with a value. Update memory stats
|
||||
void Upload(DbIndex dbid, string_view value, size_t serialized_len, PrimeValue* pv) {
|
||||
pv->Materialize(value);
|
||||
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
|
||||
}
|
||||
|
||||
// Find entry by key in db_slice and store external segment in place of original value.
|
||||
// Update memory stats
|
||||
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
|
||||
if (auto pv = Find(key); pv) {
|
||||
RecordAdded(*pv, segment.length, db_slice_->MutableStats(key.first));
|
||||
RecordAdded(*pv, segment.length, GetDbTableStats(key.first));
|
||||
|
||||
pv->SetIoPending(false);
|
||||
pv->SetExternal(segment.offset, segment.length);
|
||||
|
@ -162,13 +153,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
DbSlice* db_slice_;
|
||||
};
|
||||
|
||||
void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view value) {
|
||||
void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view page) {
|
||||
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
|
||||
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
|
||||
for (auto [dbid, hash, item_segment] : ts_->bins_->DeleteBin(segment, page)) {
|
||||
// Search for key with the same hash and value pointing to the same segment.
|
||||
// If it still exists, it must correspond to the value stored in this bin
|
||||
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
|
||||
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
|
||||
auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) {
|
||||
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment;
|
||||
};
|
||||
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
|
||||
if (!IsValid(it))
|
||||
|
@ -177,8 +168,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
|
|||
stats_.total_defrags++;
|
||||
|
||||
// Cut out relevant part of value and restore it to memory
|
||||
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
|
||||
SetInMemory(&it->second, dbid, sub_value, sub_segment);
|
||||
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
|
||||
Upload(dbid, value, item_segment.length, &it->second);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,10 +191,17 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
|
|||
if (SliceSnapshot::IsSnaphotInProgress())
|
||||
return false;
|
||||
|
||||
SetInMemory(get<OpManager::KeyRef>(id), value, segment);
|
||||
auto key = get<OpManager::KeyRef>(id);
|
||||
auto* pv = Find(key);
|
||||
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
|
||||
Upload(key.first, value, segment.length, pv);
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG(DFATAL) << "Internal error, should not reach this";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
|
||||
if (OccupiesWholePages(segment.length))
|
||||
return true;
|
||||
|
@ -214,7 +212,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
|
|||
}
|
||||
|
||||
if (bin.fragmented) {
|
||||
// Trigger read to signal need for defragmentation. ReportFetched will handle it.
|
||||
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
|
||||
VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset;
|
||||
Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; });
|
||||
}
|
||||
|
@ -322,7 +320,8 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
|
|||
|
||||
tiering::DiskSegment segment = value->GetExternalSlice();
|
||||
op_manager_->DeleteOffloaded(segment);
|
||||
op_manager_->SetInMemory(value, dbid, string_view{}, segment);
|
||||
value->Reset();
|
||||
RecordDeleted(*value, segment.length, op_manager_->GetDbTableStats(dbid));
|
||||
}
|
||||
|
||||
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
|
||||
|
|
|
@ -185,17 +185,25 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
|
|||
pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); });
|
||||
|
||||
// Stash all values
|
||||
string value = BuildString(3000);
|
||||
for (size_t i = 0; i < kNum; i++) {
|
||||
Run({"SET", absl::StrCat("k", i), BuildString(3000)});
|
||||
Run({"SETEX", absl::StrCat("k", i), "100", value});
|
||||
}
|
||||
|
||||
ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });
|
||||
ASSERT_EQ(GetMetrics().tiered_stats.total_stashes, kNum);
|
||||
ASSERT_EQ(GetMetrics().db_stats[0].tiered_entries, kNum);
|
||||
|
||||
// Trigger re-fetch
|
||||
// Trigger re-fetch and test TTL is preserved.
|
||||
for (size_t i = 0; i < kNum; i++) {
|
||||
Run({"GET", absl::StrCat("k", i)});
|
||||
string key = absl::StrCat("k", i);
|
||||
auto resp = Run({"TTL", key});
|
||||
EXPECT_THAT(resp, IntArg(100));
|
||||
|
||||
resp = Run({"GET", key});
|
||||
EXPECT_EQ(resp, value);
|
||||
resp = Run({"TTL", key});
|
||||
EXPECT_THAT(resp, IntArg(100));
|
||||
}
|
||||
|
||||
// Wait for offload to do it all again
|
||||
|
@ -230,6 +238,7 @@ TEST_F(TieredStorageTest, FlushAll) {
|
|||
Run({"FLUSHALL"});
|
||||
|
||||
done = true;
|
||||
util::ThisFiber::SleepFor(50ms);
|
||||
reader.Join();
|
||||
|
||||
auto metrics = GetMetrics();
|
||||
|
|
Loading…
Reference in a new issue