diff --git a/src/server/common.cc b/src/server/common.cc index 77a290aaa..761b1de20 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -257,34 +257,22 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x -IoMgrStats& IoMgrStats::operator+=(const IoMgrStats& rhs) { - static_assert(sizeof(IoMgrStats) == 16); - - read_total += rhs.read_total; - read_delay_usec += rhs.read_delay_usec; - - return *this; -} - TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 48); - - ADD(tiered_writes); - ADD(storage_capacity); - ADD(storage_reserved); - ADD(aborted_write_cnt); - ADD(flush_skip_cnt); - ADD(throttled_write_cnt); - - return *this; -} - -TieredStatsV2& TieredStatsV2::operator+=(const TieredStatsV2& o) { - static_assert(sizeof(TieredStatsV2) == 24); + static_assert(sizeof(TieredStats) == 80); ADD(total_stashes); ADD(total_fetches); + ADD(total_cancels); + ADD(allocated_bytes); + ADD(capacity_bytes); + + ADD(pending_read_cnt); + ADD(pending_stash_cnt); + + ADD(small_bins_cnt); + ADD(small_bins_entries_cnt); + ADD(small_bins_filling_bytes); return *this; } diff --git a/src/server/common.h b/src/server/common.h index 54a605571..25a8b1efd 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -59,33 +59,22 @@ struct LockTagOptions { static const LockTagOptions& instance(); }; -struct IoMgrStats { - uint64_t read_total = 0; - uint64_t read_delay_usec = 0; - - IoMgrStats& operator+=(const IoMgrStats& rhs); -}; - struct TieredStats { - uint64_t tiered_writes = 0; - - size_t storage_capacity = 0; - - // how much was reserved by actively stored items. - size_t storage_reserved = 0; - uint64_t aborted_write_cnt = 0; - uint64_t flush_skip_cnt = 0; - uint64_t throttled_write_cnt = 0; - - TieredStats& operator+=(const TieredStats&); -}; - -struct TieredStatsV2 { size_t total_stashes = 0; size_t total_fetches = 0; - size_t allocated_bytes = 0; + size_t total_cancels = 0; - TieredStatsV2& operator+=(const TieredStatsV2&); + size_t allocated_bytes = 0; + size_t capacity_bytes = 0; + + size_t pending_read_cnt = 0; + size_t pending_stash_cnt = 0; + + size_t small_bins_cnt = 0; + size_t small_bins_entries_cnt = 0; + size_t small_bins_filling_bytes = 0; + + TieredStats& operator+=(const TieredStats&); }; struct SearchStats { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dd8e73212..6d6978877 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1144,15 +1144,6 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("reply_total", "", m.facade_stats.reply_stats.send_stats.count, MetricType::COUNTER, &resp->body()); - - // Tiered metrics. - if (m.disk_stats.read_total > 0) { - AppendMetricWithoutLabels("tiered_reads_total", "", m.disk_stats.read_total, - MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("tiered_reads_latency_seconds", "", - double(m.disk_stats.read_delay_usec) * 1e-6, MetricType::COUNTER, - &resp->body()); - } } AppendMetricWithoutLabels("script_error_total", "", m.facade_stats.reply_stats.script_error_count, @@ -1845,7 +1836,7 @@ Metrics ServerFamily::GetMetrics() const { result.shard_stats += shard->stats(); if (shard->tiered_storage_v2()) { - result.tiered_stats_v2 += shard->tiered_storage_v2()->GetStats(); + result.tiered_stats += shard->tiered_storage_v2()->GetStats(); } if (shard->search_indices()) { @@ -2057,23 +2048,19 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("TIERED", true)) { - append("tiered_entries", total.tiered_entries); - append("tiered_bytes", total.tiered_size); - append("tiered_bytes_human", HumanReadableNumBytes(total.tiered_size)); - append("tiered_reads", m.disk_stats.read_total); - append("tiered_read_latency_usec", m.disk_stats.read_delay_usec); - append("tiered_writes", m.tiered_stats.tiered_writes); - append("tiered_reserved", m.tiered_stats.storage_reserved); - append("tiered_capacity", m.tiered_stats.storage_capacity); - append("tiered_aborted_writes", m.tiered_stats.aborted_write_cnt); - append("tiered_flush_skipped", m.tiered_stats.flush_skip_cnt); - append("tiered_throttled_writes", m.tiered_stats.throttled_write_cnt); - } + append("tiered_total_stashes", m.tiered_stats.total_stashes); + append("tiered_total_fetches", m.tiered_stats.total_fetches); + append("tiered_total_cancels", m.tiered_stats.total_cancels); - if (should_enter("TIERED_V2", true)) { - append("tiered_v2_total_stashes", m.tiered_stats_v2.total_stashes); - append("tiered_v2_total_fetches", m.tiered_stats_v2.total_fetches); - append("tiered_v2_allocated_bytes", m.tiered_stats_v2.allocated_bytes); + append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes); + append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes); + + append("tiered_pending_read_cnt", m.tiered_stats.pending_read_cnt); + append("tiered_pending_stash_cnt", m.tiered_stats.pending_stash_cnt); + + append("tiered_small_bins_cnt", m.tiered_stats.small_bins_cnt); + append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt); + append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes); } if (should_enter("PERSISTENCE", true)) { diff --git a/src/server/server_family.h b/src/server/server_family.h index 80fb0adc8..466805863 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -80,10 +80,8 @@ struct Metrics { EngineShard::Stats shard_stats; // per-shard stats facade::FacadeStats facade_stats; // client stats and buffer sizes - TieredStats tiered_stats; // stats for tiered storage - TieredStatsV2 tiered_stats_v2; + TieredStats tiered_stats; - IoMgrStats disk_stats; // disk stats for io_mgr SearchStats search_stats; ServerState::Stats coordinator_stats; // stats on transaction running PeakStats peak_stats; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index f3912d402..6bb78541c 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -58,8 +58,10 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager { // Clear IO pending flag for entry void ClearIoPending(string_view key) { - if (auto pv = Find(key); pv) + if (auto pv = Find(key); pv) { pv->SetIoPending(false); + stats_.total_cancels++; + } } // Clear IO pending flag for all contained entries of bin @@ -109,13 +111,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager { } } - TieredStatsV2 GetStats() const { - auto stats = stats_; - stats.allocated_bytes = OpManager::storage_.GetStats().allocated_bytes; - return stats; - } - private: + friend class TieredStorageV2; + PrimeValue* Find(string_view key) { // TODO: Get DbContext for transaction for correct dbid and time auto it = db_slice_->FindMutable(DbContext{}, key); @@ -124,7 +122,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager { bool cache_fetched_ = false; - TieredStatsV2 stats_; + struct { + size_t total_stashes = 0, total_fetches = 0, total_cancels = 0; + } stats_; TieredStorageV2* ts_; DbSlice* db_slice_; @@ -218,8 +218,32 @@ bool TieredStorageV2::ShouldStash(const PrimeValue& pv) { return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize; } -TieredStatsV2 TieredStorageV2::GetStats() const { - return op_manager_->GetStats(); +TieredStats TieredStorageV2::GetStats() const { + TieredStats stats{}; + + { // ShardOpManager stats + auto shard_stats = op_manager_->stats_; + stats.total_fetches = shard_stats.total_fetches; + stats.total_stashes = shard_stats.total_stashes; + stats.total_cancels = shard_stats.total_cancels; + } + + { // OpManager stats + tiering::OpManager::Stats op_stats = op_manager_->GetStats(); + stats.pending_read_cnt = op_stats.pending_read_cnt; + stats.pending_stash_cnt = op_stats.pending_stash_cnt; + stats.allocated_bytes = op_stats.disk_stats.allocated_bytes; + stats.capacity_bytes = op_stats.disk_stats.capacity_bytes; + } + + { // SmallBins stats + tiering::SmallBins::Stats bins_stats = bins_->GetStats(); + stats.small_bins_cnt = bins_stats.stashed_bins_cnt; + stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt; + stats.small_bins_filling_bytes = bins_stats.current_bin_bytes; + } + + return stats; } } // namespace dfly diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 49c804935..ad692f941 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -58,7 +58,7 @@ class TieredStorageV2 { // Returns if a value should be stashed bool ShouldStash(const PrimeValue& pv); - TieredStatsV2 GetStats() const; + TieredStats GetStats() const; private: std::unique_ptr op_manager_; @@ -108,8 +108,8 @@ class TieredStorageV2 { void Delete(std::string_view key, PrimeValue* value) { } - TieredStatsV2 GetStats() { - return TieredStatsV2{}; + TieredStats GetStats() { + return TieredStats{}; } }; diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 35ac496b5..9dfdf3b0e 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -63,7 +63,7 @@ TEST_F(TieredStorageV2Test, SimpleGetSet) { // Make sure all entries were stashed, except the one few not filling a small page size_t stashes = 0; ExpectConditionWithinTimeout([this, &stashes] { - stashes = GetMetrics().tiered_stats_v2.total_stashes; + stashes = GetMetrics().tiered_stats.total_stashes; return stashes >= kMax - 64 - 1; }); diff --git a/src/server/tiering/disk_storage.cc b/src/server/tiering/disk_storage.cc index 05d0c171d..38aaa6332 100644 --- a/src/server/tiering/disk_storage.cc +++ b/src/server/tiering/disk_storage.cc @@ -119,7 +119,6 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { memcpy(buf.bytes.data(), bytes.data(), bytes.length()); auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) { - VLOG(0) << "IoRes " << io_res << " " << len; if (io_res < 0) { MarkAsFree({size_t(offset), len}); cb({}, std::error_code{-io_res, std::system_category()}); @@ -134,7 +133,7 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { } DiskStorage::Stats DiskStorage::GetStats() const { - return {alloc_.allocated_bytes()}; + return {alloc_.allocated_bytes(), alloc_.capacity()}; } } // namespace dfly::tiering diff --git a/src/server/tiering/disk_storage.h b/src/server/tiering/disk_storage.h index 90127311d..96b1cc8b6 100644 --- a/src/server/tiering/disk_storage.h +++ b/src/server/tiering/disk_storage.h @@ -19,6 +19,7 @@ class DiskStorage { public: struct Stats { size_t allocated_bytes = 0; + size_t capacity_bytes = 0; }; using ReadCb = std::function; diff --git a/src/server/tiering/disk_storage_test.cc b/src/server/tiering/disk_storage_test.cc index 893919237..7b4fd9cf0 100644 --- a/src/server/tiering/disk_storage_test.cc +++ b/src/server/tiering/disk_storage_test.cc @@ -8,6 +8,7 @@ #include "base/gtest.h" #include "base/logging.h" +#include "server/tiering/common.h" #include "server/tiering/test_common.h" #include "util/fibers/fibers.h" #include "util/fibers/pool.h" @@ -59,12 +60,16 @@ struct DiskStorageTest : public PoolTestBase { last_reads_.erase(index); } - void Wait() { + void Wait() const { while (pending_ops_ > 0) { ::util::ThisFiber::SleepFor(1ms); } } + DiskStorage::Stats GetStats() const { + return storage_->GetStats(); + } + protected: int pending_ops_ = 0; @@ -82,6 +87,8 @@ TEST_F(DiskStorageTest, Basic) { Wait(); EXPECT_EQ(segments_.size(), 100); + EXPECT_EQ(GetStats().allocated_bytes, 100 * kPageSize); + // Read all 100 values for (size_t i = 0; i < 100; i++) Read(i); @@ -91,6 +98,11 @@ TEST_F(DiskStorageTest, Basic) { for (size_t i = 0; i < 100; i++) EXPECT_EQ(last_reads_[i], absl::StrCat("value", i)); + // Delete all values + for (size_t i = 0; i < 100; i++) + Delete(i); + EXPECT_EQ(GetStats().allocated_bytes, 0); + Close(); }); } diff --git a/src/server/tiering/io_mgr.h b/src/server/tiering/io_mgr.h index e5a5538e9..c88102af5 100644 --- a/src/server/tiering/io_mgr.h +++ b/src/server/tiering/io_mgr.h @@ -47,16 +47,11 @@ class IoMgr { return grow_progress_; } - const IoMgrStats& GetStats() const { - return stats_; - } - private: std::unique_ptr backing_file_; size_t sz_ = 0; bool grow_progress_ = false; - IoMgrStats stats_; }; } // namespace dfly::tiering diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index be7ed9e5a..2206c63fe 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -44,7 +44,7 @@ void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) { void OpManager::Delete(EntryId id) { // If the item isn't offloaded, it has io pending, so cancel it DCHECK(pending_stash_ver_.count(ToOwned(id))); - ++pending_stash_ver_[ToOwned(id)]; + pending_stash_ver_.erase(ToOwned(id)); } void OpManager::Delete(DiskSegment segment) { @@ -89,7 +89,8 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment it != pending_stash_ver_.end() && it->second == version) { pending_stash_ver_.erase(it); ReportStashed(id, segment, ec); - } else { + } else if (!ec) { + // Throw away the value because it's no longer up-to-date even if no error occured storage_.MarkAsFree(segment); } } @@ -125,4 +126,10 @@ OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segmen return key_ops.emplace_back(ToOwned(id), key_segment); } +OpManager::Stats OpManager::GetStats() const { + return {.disk_stats = storage_.GetStats(), + .pending_read_cnt = pending_reads_.size(), + .pending_stash_cnt = pending_stash_ver_.size()}; +} + } // namespace dfly::tiering diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index 6a726dd8d..c7463f631 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -20,10 +20,18 @@ namespace dfly::tiering { // safely schedules deletes after reads and allows cancelling pending stashes class OpManager { public: + struct Stats { + DiskStorage::Stats disk_stats; + + size_t pending_read_cnt = 0; + size_t pending_stash_cnt = 0; + }; + // Two separate keyspaces are provided - one for strings, one for numeric identifiers. // Ids can be used to track auxiliary values that don't map to real keys (like packed pages). using EntryId = std::variant; using OwnedEntryId = std::variant; + // Callback for post-read completion. Returns whether the value was modified using ReadCallback = std::function; @@ -47,6 +55,8 @@ class OpManager { // Stash value to be offloaded std::error_code Stash(EntryId id, std::string_view value); + Stats GetStats() const; + protected: // Report that a stash succeeded and the entry was stored at the provided segment or failed with // given error diff --git a/src/server/tiering/op_manager_test.cc b/src/server/tiering/op_manager_test.cc index 453ab729b..e197540d4 100644 --- a/src/server/tiering/op_manager_test.cc +++ b/src/server/tiering/op_manager_test.cc @@ -63,9 +63,13 @@ TEST_F(OpManagerTest, SimpleStashesWithReads) { EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real"))); } + EXPECT_EQ(GetStats().pending_stash_cnt, 100); + while (stashed_.size() < 100) util::ThisFiber::SleepFor(1ms); + EXPECT_EQ(GetStats().disk_stats.allocated_bytes, 100 * kPageSize); + for (unsigned i = 0; i < 100; i++) { EXPECT_GE(stashed_[i].offset, i > 0); EXPECT_EQ(stashed_[i].length, 10 + (i > 9)); diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 5dcdf6044..eedad5a67 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -59,7 +59,7 @@ SmallBins::FilledBin SmallBins::FlushBin() { data += value.size(); } - current_bin_bytes_ = 0; // num hashes + current_bin_bytes_ = 0; current_bin_.erase(current_bin_.begin(), current_bin_.end()); return {id, std::move(out)}; @@ -67,9 +67,13 @@ SmallBins::FilledBin SmallBins::FlushBin() { SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment) { auto key_list = pending_bins_.extract(id); + DCHECK_GT(key_list.mapped().size(), 0u); + auto segment_list = SmallBins::KeySegmentList{key_list.mapped().begin(), key_list.mapped().end()}; for (auto& [_, sub_segment] : segment_list) sub_segment.offset += segment.offset; + + stats_.total_stashed_entries += segment_list.size(); return segment_list; } @@ -98,12 +102,20 @@ std::optional SmallBins::Delete(std::string_view key) { std::optional SmallBins::Delete(DiskSegment segment) { segment = segment.FillPages(); - if (auto it = stashed_bins_.find(segment.offset); - it != stashed_bins_.end() && --it->second == 0) { - stashed_bins_.erase(it); - return segment; + if (auto it = stashed_bins_.find(segment.offset); it != stashed_bins_.end()) { + stats_.total_stashed_entries--; + if (--it->second == 0) { + stashed_bins_.erase(it); + return segment; + } } return std::nullopt; } +SmallBins::Stats SmallBins::GetStats() const { + return Stats{.stashed_bins_cnt = stashed_bins_.size(), + .stashed_entries_cnt = stats_.total_stashed_entries, + .current_bin_bytes = current_bin_bytes_}; +} + } // namespace dfly::tiering diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 082369ca3..2697cf575 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -19,6 +19,12 @@ namespace dfly::tiering { // SIMPLEST VERSION for now. class SmallBins { public: + struct Stats { + size_t stashed_bins_cnt = 0; + size_t stashed_entries_cnt = 0; + size_t current_bin_bytes = 0; + }; + using BinId = unsigned; // Bin filled with blob of serialized entries @@ -42,6 +48,8 @@ class SmallBins { // Delete a stored segment. Returns page segment if it became emtpy and needs to be deleted. std::optional Delete(DiskSegment segment); + Stats GetStats() const; + private: // Flush current bin FilledBin FlushBin(); @@ -58,6 +66,10 @@ class SmallBins { // Map of bins that were stashed and should be deleted when refcount reaches 0 absl::flat_hash_map stashed_bins_; + + struct { + size_t total_stashed_entries = 0; + } stats_; }; }; // namespace dfly::tiering diff --git a/tests/dragonfly/tiering_test.py b/tests/dragonfly/tiering_test.py index 1c9567b24..0616cd4ee 100644 --- a/tests/dragonfly/tiering_test.py +++ b/tests/dragonfly/tiering_test.py @@ -21,18 +21,19 @@ async def test_tiering_simple(async_client: aioredis.Redis): # Wait for all to be offloaded with async_timeout.timeout(1): - info = await async_client.info("TIERED_V2") - while info["tiered_v2_total_stashes"] != 100: - info = await async_client.info("TIERED_V2") + info = await async_client.info("TIERED") + print(info) + while info["tiered_total_stashes"] != 100: + info = await async_client.info("TIERED") await asyncio.sleep(0.1) - assert 3000 * 100 <= info["tiered_v2_allocated_bytes"] <= 4096 * 100 + assert 3000 * 100 <= info["tiered_allocated_bytes"] <= 4096 * 100 # Fetch back for key in (f"k{i}" for i in range(1, 100 + 1)): assert len(await async_client.execute_command("GET", key)) == 3000 - assert (await async_client.info("TIERED_V2"))["tiered_v2_total_fetches"] == 100 + assert (await async_client.info("TIERED"))["tiered_total_fetches"] == 100 # Wait to be deleted with async_timeout.timeout(1): - while (await async_client.info("TIERED_V2"))["tiered_v2_allocated_bytes"] > 0: + while (await async_client.info("TIERED"))["tiered_allocated_bytes"] > 0: await asyncio.sleep(0.1)