diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 2e815e47f..5485df232 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1423,13 +1423,10 @@ void DbSlice::ClearOffloadedEntries(absl::Span indices, const DbT }); } while (cursor); - // Wait for delete operations to finish in sync. - // TODO: the logic inside tiered_storage that updates tiered_entries is somewhat fragile. - // To revisit it, otherwise we may have deadlocks around this code. - while (db_ptr->stats.tiered_entries > 0) { - LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush"; - ThisFiber::SleepFor(1ms); - } + // While tiered_storage may delete some of its entries asynchronously, it updates + // stats.tiered_entries immediately during the Delete call, therefore tiered_entries + // should be zero by this point. + CHECK_EQ(db_ptr->stats.tiered_entries, 0u); } } diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index b080a56c0..d75bd8ffa 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -88,23 +88,16 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { ClearIoPending(key); } - // Set value to be an in-memory type again, either empty or with a value. Update memory stats - void SetInMemory(PrimeValue* pv, DbIndex dbid, string_view value, tiering::DiskSegment segment) { - pv->Reset(); - if (!value.empty()) - pv->SetString(value); - - RecordDeleted(*pv, segment.length, db_slice_->MutableStats(dbid)); + DbTableStats* GetDbTableStats(DbIndex dbid) { + return db_slice_->MutableStats(dbid); } - // Find entry by key and store it's up-to-date value in place of external segment. - // Returns false if the value is outdated, true otherwise - bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) { - if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { - SetInMemory(pv, key.first, value, segment); - return true; - } - return false; + private: + PrimeValue* Find(OpManager::KeyRef key) { + // TODO: Get DbContext for transaction for correct dbid and time + // Bypass all update and stat mechanisms + auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second); + return IsValid(it) ? &it->second : nullptr; } // Load all values from bin by their hashes @@ -124,19 +117,17 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { bool NotifyDelete(tiering::DiskSegment segment) override; - private: - PrimeValue* Find(OpManager::KeyRef key) { - // TODO: Get DbContext for transaction for correct dbid and time - // Bypass all update and stat mechanisms - auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second); - return IsValid(it) ? &it->second : nullptr; + // Set value to be an in-memory type again, either empty or with a value. Update memory stats + void Upload(DbIndex dbid, string_view value, size_t serialized_len, PrimeValue* pv) { + pv->Materialize(value); + RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid)); } // Find entry by key in db_slice and store external segment in place of original value. // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { if (auto pv = Find(key); pv) { - RecordAdded(*pv, segment.length, db_slice_->MutableStats(key.first)); + RecordAdded(*pv, segment.length, GetDbTableStats(key.first)); pv->SetIoPending(false); pv->SetExternal(segment.offset, segment.length); @@ -162,13 +153,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { DbSlice* db_slice_; }; -void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view value) { +void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view page) { // Note: Bin could've already been deleted, in that case DeleteBin returns an empty list - for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) { + for (auto [dbid, hash, item_segment] : ts_->bins_->DeleteBin(segment, page)) { // Search for key with the same hash and value pointing to the same segment. // If it still exists, it must correspond to the value stored in this bin - auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) { - return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment; + auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) { + return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment; }; auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate); if (!IsValid(it)) @@ -177,8 +168,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str stats_.total_defrags++; // Cut out relevant part of value and restore it to memory - string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length); - SetInMemory(&it->second, dbid, sub_value, sub_segment); + string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length); + Upload(dbid, value, item_segment.length, &it->second); } } @@ -200,8 +191,15 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, if (SliceSnapshot::IsSnaphotInProgress()) return false; - SetInMemory(get(id), value, segment); - return true; + auto key = get(id); + auto* pv = Find(key); + if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { + Upload(key.first, value, segment.length, pv); + return true; + } + + LOG(DFATAL) << "Internal error, should not reach this"; + return false; } bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { @@ -214,7 +212,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { } if (bin.fragmented) { - // Trigger read to signal need for defragmentation. ReportFetched will handle it. + // Trigger read to signal need for defragmentation. NotifyFetched will handle it. VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset; Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; }); } @@ -322,7 +320,8 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { tiering::DiskSegment segment = value->GetExternalSlice(); op_manager_->DeleteOffloaded(segment); - op_manager_->SetInMemory(value, dbid, string_view{}, segment); + value->Reset(); + RecordDeleted(*value, segment.length, op_manager_->GetDbTableStats(dbid)); } void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) { diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index f97f097d8..daddaea82 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -185,17 +185,25 @@ TEST_F(TieredStorageTest, BackgroundOffloading) { pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); }); // Stash all values + string value = BuildString(3000); for (size_t i = 0; i < kNum; i++) { - Run({"SET", absl::StrCat("k", i), BuildString(3000)}); + Run({"SETEX", absl::StrCat("k", i), "100", value}); } ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; }); ASSERT_EQ(GetMetrics().tiered_stats.total_stashes, kNum); ASSERT_EQ(GetMetrics().db_stats[0].tiered_entries, kNum); - // Trigger re-fetch + // Trigger re-fetch and test TTL is preserved. for (size_t i = 0; i < kNum; i++) { - Run({"GET", absl::StrCat("k", i)}); + string key = absl::StrCat("k", i); + auto resp = Run({"TTL", key}); + EXPECT_THAT(resp, IntArg(100)); + + resp = Run({"GET", key}); + EXPECT_EQ(resp, value); + resp = Run({"TTL", key}); + EXPECT_THAT(resp, IntArg(100)); } // Wait for offload to do it all again @@ -230,6 +238,7 @@ TEST_F(TieredStorageTest, FlushAll) { Run({"FLUSHALL"}); done = true; + util::ThisFiber::SleepFor(50ms); reader.Join(); auto metrics = GetMetrics();