mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(server): Sync FLUSH with tiering (#3098)
* fix(server): Sync FLUSH with tiering Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
ba0f2932e5
commit
137bd313ef
3 changed files with 71 additions and 13 deletions
|
@ -704,7 +704,15 @@ void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) {
|
|||
}
|
||||
|
||||
void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
|
||||
// TODO: to add preeemptiveness by yielding inside clear.
|
||||
// Async cleanup can only be performed if no tiered entries exist
|
||||
bool async_cleanup = true;
|
||||
for (DbIndex index : indexes) {
|
||||
async_cleanup &= db_arr_[index]->stats.tiered_entries == 0;
|
||||
}
|
||||
|
||||
if (!async_cleanup)
|
||||
ClearEntriesOnFlush(indexes, db_arr_, false);
|
||||
|
||||
DbTableArray flush_db_arr(db_arr_.size());
|
||||
for (DbIndex index : indexes) {
|
||||
auto& db = db_arr_[index];
|
||||
|
@ -715,19 +723,11 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
|
|||
CreateDb(index);
|
||||
std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks);
|
||||
}
|
||||
CHECK(fetched_items_.empty());
|
||||
auto cb = [this, flush_db_arr = std::move(flush_db_arr)]() mutable {
|
||||
for (auto& db_ptr : flush_db_arr) {
|
||||
if (db_ptr && db_ptr->stats.tiered_entries > 0) {
|
||||
for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) {
|
||||
if (it->second.IsExternal())
|
||||
PerformDeletion(Iterator::FromPrime(it), db_ptr.get());
|
||||
}
|
||||
|
||||
DCHECK_EQ(0u, db_ptr->stats.tiered_entries);
|
||||
db_ptr.reset();
|
||||
}
|
||||
}
|
||||
CHECK(fetched_items_.empty());
|
||||
auto cb = [this, async_cleanup, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
|
||||
if (async_cleanup)
|
||||
ClearEntriesOnFlush(indexes, flush_db_arr, true);
|
||||
flush_db_arr.clear();
|
||||
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
|
||||
ServerState::kGlibcmalloc);
|
||||
|
@ -1382,6 +1382,30 @@ void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) {
|
|||
}
|
||||
}
|
||||
|
||||
void DbSlice::ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
|
||||
bool async) {
|
||||
for (auto index : indices) {
|
||||
const auto& db_ptr = db_arr[index];
|
||||
if (!db_ptr || db_ptr->stats.tiered_entries == 0)
|
||||
continue;
|
||||
|
||||
// Delete all tiered entries
|
||||
PrimeTable::Cursor cursor;
|
||||
do {
|
||||
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
|
||||
if (it->second.IsExternal())
|
||||
PerformDeletion(it, db_ptr.get());
|
||||
});
|
||||
} while (cursor && db_ptr->stats.tiered_entries > 0);
|
||||
|
||||
// Wait for delete operations to finish in sync
|
||||
while (!async && db_ptr->stats.tiered_entries > 0) {
|
||||
LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
|
||||
ThisFiber::SleepFor(1ms);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DbSlice::SetDocDeletionCallback(DocDeletionCallback ddcb) {
|
||||
doc_del_cb_ = std::move(ddcb);
|
||||
}
|
||||
|
|
|
@ -486,6 +486,11 @@ class DbSlice {
|
|||
// Invalidate all watched keys for given slots. Used on FlushSlots.
|
||||
void InvalidateSlotWatches(const cluster::SlotSet& slot_ids);
|
||||
|
||||
// Properly clear db_arr before deleting it. If async is set, it's called from a detached fiber
|
||||
// after swapping the db.
|
||||
void ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
|
||||
bool async);
|
||||
|
||||
void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table);
|
||||
|
||||
// Send invalidation message to the clients that are tracking the change to a key.
|
||||
|
|
|
@ -196,4 +196,33 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
|
|||
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096);
|
||||
}
|
||||
|
||||
TEST_F(TieredStorageTest, FlushAll) {
|
||||
absl::FlagSaver saver;
|
||||
absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
|
||||
|
||||
const int kNum = 500;
|
||||
for (size_t i = 0; i < kNum; i++) {
|
||||
Run({"SET", absl::StrCat("k", i), string(3000, 'A')});
|
||||
}
|
||||
ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; });
|
||||
|
||||
// Start reading random entries
|
||||
atomic_bool done = false;
|
||||
auto reader = pp_->at(0)->LaunchFiber([&] {
|
||||
while (!done) {
|
||||
Run("reader", {"GET", absl::StrCat("k", rand() % kNum)});
|
||||
}
|
||||
});
|
||||
|
||||
util::ThisFiber::SleepFor(50ms);
|
||||
Run({"FLUSHALL"});
|
||||
|
||||
done = true;
|
||||
reader.Join();
|
||||
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u);
|
||||
EXPECT_GT(metrics.tiered_stats.total_fetches, 2u);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Reference in a new issue