1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(tiering): Lots of metrics (#2977)

* chore(tiering): Lots of metrics
This commit is contained in:
Vladislav 2024-04-30 22:25:45 +03:00 committed by GitHub
parent 415839df79
commit fd5772a186
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 149 additions and 110 deletions

View file

@ -257,34 +257,22 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x
IoMgrStats& IoMgrStats::operator+=(const IoMgrStats& rhs) {
static_assert(sizeof(IoMgrStats) == 16);
read_total += rhs.read_total;
read_delay_usec += rhs.read_delay_usec;
return *this;
}
TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 48);
ADD(tiered_writes);
ADD(storage_capacity);
ADD(storage_reserved);
ADD(aborted_write_cnt);
ADD(flush_skip_cnt);
ADD(throttled_write_cnt);
return *this;
}
TieredStatsV2& TieredStatsV2::operator+=(const TieredStatsV2& o) {
static_assert(sizeof(TieredStatsV2) == 24);
static_assert(sizeof(TieredStats) == 80);
ADD(total_stashes);
ADD(total_fetches);
ADD(total_cancels);
ADD(allocated_bytes);
ADD(capacity_bytes);
ADD(pending_read_cnt);
ADD(pending_stash_cnt);
ADD(small_bins_cnt);
ADD(small_bins_entries_cnt);
ADD(small_bins_filling_bytes);
return *this;
}

View file

@ -59,33 +59,22 @@ struct LockTagOptions {
static const LockTagOptions& instance();
};
struct IoMgrStats {
uint64_t read_total = 0;
uint64_t read_delay_usec = 0;
IoMgrStats& operator+=(const IoMgrStats& rhs);
};
struct TieredStats {
uint64_t tiered_writes = 0;
size_t storage_capacity = 0;
// how much was reserved by actively stored items.
size_t storage_reserved = 0;
uint64_t aborted_write_cnt = 0;
uint64_t flush_skip_cnt = 0;
uint64_t throttled_write_cnt = 0;
TieredStats& operator+=(const TieredStats&);
};
struct TieredStatsV2 {
size_t total_stashes = 0;
size_t total_fetches = 0;
size_t allocated_bytes = 0;
size_t total_cancels = 0;
TieredStatsV2& operator+=(const TieredStatsV2&);
size_t allocated_bytes = 0;
size_t capacity_bytes = 0;
size_t pending_read_cnt = 0;
size_t pending_stash_cnt = 0;
size_t small_bins_cnt = 0;
size_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;
TieredStats& operator+=(const TieredStats&);
};
struct SearchStats {

View file

@ -1144,15 +1144,6 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("reply_total", "", m.facade_stats.reply_stats.send_stats.count,
MetricType::COUNTER, &resp->body());
// Tiered metrics.
if (m.disk_stats.read_total > 0) {
AppendMetricWithoutLabels("tiered_reads_total", "", m.disk_stats.read_total,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("tiered_reads_latency_seconds", "",
double(m.disk_stats.read_delay_usec) * 1e-6, MetricType::COUNTER,
&resp->body());
}
}
AppendMetricWithoutLabels("script_error_total", "", m.facade_stats.reply_stats.script_error_count,
@ -1845,7 +1836,7 @@ Metrics ServerFamily::GetMetrics() const {
result.shard_stats += shard->stats();
if (shard->tiered_storage_v2()) {
result.tiered_stats_v2 += shard->tiered_storage_v2()->GetStats();
result.tiered_stats += shard->tiered_storage_v2()->GetStats();
}
if (shard->search_indices()) {
@ -2057,23 +2048,19 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("TIERED", true)) {
append("tiered_entries", total.tiered_entries);
append("tiered_bytes", total.tiered_size);
append("tiered_bytes_human", HumanReadableNumBytes(total.tiered_size));
append("tiered_reads", m.disk_stats.read_total);
append("tiered_read_latency_usec", m.disk_stats.read_delay_usec);
append("tiered_writes", m.tiered_stats.tiered_writes);
append("tiered_reserved", m.tiered_stats.storage_reserved);
append("tiered_capacity", m.tiered_stats.storage_capacity);
append("tiered_aborted_writes", m.tiered_stats.aborted_write_cnt);
append("tiered_flush_skipped", m.tiered_stats.flush_skip_cnt);
append("tiered_throttled_writes", m.tiered_stats.throttled_write_cnt);
}
append("tiered_total_stashes", m.tiered_stats.total_stashes);
append("tiered_total_fetches", m.tiered_stats.total_fetches);
append("tiered_total_cancels", m.tiered_stats.total_cancels);
if (should_enter("TIERED_V2", true)) {
append("tiered_v2_total_stashes", m.tiered_stats_v2.total_stashes);
append("tiered_v2_total_fetches", m.tiered_stats_v2.total_fetches);
append("tiered_v2_allocated_bytes", m.tiered_stats_v2.allocated_bytes);
append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes);
append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes);
append("tiered_pending_read_cnt", m.tiered_stats.pending_read_cnt);
append("tiered_pending_stash_cnt", m.tiered_stats.pending_stash_cnt);
append("tiered_small_bins_cnt", m.tiered_stats.small_bins_cnt);
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
}
if (should_enter("PERSISTENCE", true)) {

View file

@ -80,10 +80,8 @@ struct Metrics {
EngineShard::Stats shard_stats; // per-shard stats
facade::FacadeStats facade_stats; // client stats and buffer sizes
TieredStats tiered_stats; // stats for tiered storage
TieredStatsV2 tiered_stats_v2;
TieredStats tiered_stats;
IoMgrStats disk_stats; // disk stats for io_mgr
SearchStats search_stats;
ServerState::Stats coordinator_stats; // stats on transaction running
PeakStats peak_stats;

View file

@ -58,8 +58,10 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
// Clear IO pending flag for entry
void ClearIoPending(string_view key) {
if (auto pv = Find(key); pv)
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
stats_.total_cancels++;
}
}
// Clear IO pending flag for all contained entries of bin
@ -109,13 +111,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}
}
TieredStatsV2 GetStats() const {
auto stats = stats_;
stats.allocated_bytes = OpManager::storage_.GetStats().allocated_bytes;
return stats;
}
private:
friend class TieredStorageV2;
PrimeValue* Find(string_view key) {
// TODO: Get DbContext for transaction for correct dbid and time
auto it = db_slice_->FindMutable(DbContext{}, key);
@ -124,7 +122,9 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
bool cache_fetched_ = false;
TieredStatsV2 stats_;
struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0;
} stats_;
TieredStorageV2* ts_;
DbSlice* db_slice_;
@ -218,8 +218,32 @@ bool TieredStorageV2::ShouldStash(const PrimeValue& pv) {
return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize;
}
TieredStatsV2 TieredStorageV2::GetStats() const {
return op_manager_->GetStats();
TieredStats TieredStorageV2::GetStats() const {
TieredStats stats{};
{ // ShardOpManager stats
auto shard_stats = op_manager_->stats_;
stats.total_fetches = shard_stats.total_fetches;
stats.total_stashes = shard_stats.total_stashes;
stats.total_cancels = shard_stats.total_cancels;
}
{ // OpManager stats
tiering::OpManager::Stats op_stats = op_manager_->GetStats();
stats.pending_read_cnt = op_stats.pending_read_cnt;
stats.pending_stash_cnt = op_stats.pending_stash_cnt;
stats.allocated_bytes = op_stats.disk_stats.allocated_bytes;
stats.capacity_bytes = op_stats.disk_stats.capacity_bytes;
}
{ // SmallBins stats
tiering::SmallBins::Stats bins_stats = bins_->GetStats();
stats.small_bins_cnt = bins_stats.stashed_bins_cnt;
stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt;
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
}
return stats;
}
} // namespace dfly

View file

@ -58,7 +58,7 @@ class TieredStorageV2 {
// Returns if a value should be stashed
bool ShouldStash(const PrimeValue& pv);
TieredStatsV2 GetStats() const;
TieredStats GetStats() const;
private:
std::unique_ptr<ShardOpManager> op_manager_;
@ -108,8 +108,8 @@ class TieredStorageV2 {
void Delete(std::string_view key, PrimeValue* value) {
}
TieredStatsV2 GetStats() {
return TieredStatsV2{};
TieredStats GetStats() {
return TieredStats{};
}
};

View file

@ -63,7 +63,7 @@ TEST_F(TieredStorageV2Test, SimpleGetSet) {
// Make sure all entries were stashed, except the one few not filling a small page
size_t stashes = 0;
ExpectConditionWithinTimeout([this, &stashes] {
stashes = GetMetrics().tiered_stats_v2.total_stashes;
stashes = GetMetrics().tiered_stats.total_stashes;
return stashes >= kMax - 64 - 1;
});

View file

@ -119,7 +119,6 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
memcpy(buf.bytes.data(), bytes.data(), bytes.length());
auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) {
VLOG(0) << "IoRes " << io_res << " " << len;
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({}, std::error_code{-io_res, std::system_category()});
@ -134,7 +133,7 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
}
DiskStorage::Stats DiskStorage::GetStats() const {
return {alloc_.allocated_bytes()};
return {alloc_.allocated_bytes(), alloc_.capacity()};
}
} // namespace dfly::tiering

View file

@ -19,6 +19,7 @@ class DiskStorage {
public:
struct Stats {
size_t allocated_bytes = 0;
size_t capacity_bytes = 0;
};
using ReadCb = std::function<void(std::string_view, std::error_code)>;

View file

@ -8,6 +8,7 @@
#include "base/gtest.h"
#include "base/logging.h"
#include "server/tiering/common.h"
#include "server/tiering/test_common.h"
#include "util/fibers/fibers.h"
#include "util/fibers/pool.h"
@ -59,12 +60,16 @@ struct DiskStorageTest : public PoolTestBase {
last_reads_.erase(index);
}
void Wait() {
void Wait() const {
while (pending_ops_ > 0) {
::util::ThisFiber::SleepFor(1ms);
}
}
DiskStorage::Stats GetStats() const {
return storage_->GetStats();
}
protected:
int pending_ops_ = 0;
@ -82,6 +87,8 @@ TEST_F(DiskStorageTest, Basic) {
Wait();
EXPECT_EQ(segments_.size(), 100);
EXPECT_EQ(GetStats().allocated_bytes, 100 * kPageSize);
// Read all 100 values
for (size_t i = 0; i < 100; i++)
Read(i);
@ -91,6 +98,11 @@ TEST_F(DiskStorageTest, Basic) {
for (size_t i = 0; i < 100; i++)
EXPECT_EQ(last_reads_[i], absl::StrCat("value", i));
// Delete all values
for (size_t i = 0; i < 100; i++)
Delete(i);
EXPECT_EQ(GetStats().allocated_bytes, 0);
Close();
});
}

View file

@ -47,16 +47,11 @@ class IoMgr {
return grow_progress_;
}
const IoMgrStats& GetStats() const {
return stats_;
}
private:
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
size_t sz_ = 0;
bool grow_progress_ = false;
IoMgrStats stats_;
};
} // namespace dfly::tiering

View file

@ -44,7 +44,7 @@ void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
void OpManager::Delete(EntryId id) {
// If the item isn't offloaded, it has io pending, so cancel it
DCHECK(pending_stash_ver_.count(ToOwned(id)));
++pending_stash_ver_[ToOwned(id)];
pending_stash_ver_.erase(ToOwned(id));
}
void OpManager::Delete(DiskSegment segment) {
@ -89,7 +89,8 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment
it != pending_stash_ver_.end() && it->second == version) {
pending_stash_ver_.erase(it);
ReportStashed(id, segment, ec);
} else {
} else if (!ec) {
// Throw away the value because it's no longer up-to-date even if no error occured
storage_.MarkAsFree(segment);
}
}
@ -125,4 +126,10 @@ OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segmen
return key_ops.emplace_back(ToOwned(id), key_segment);
}
OpManager::Stats OpManager::GetStats() const {
return {.disk_stats = storage_.GetStats(),
.pending_read_cnt = pending_reads_.size(),
.pending_stash_cnt = pending_stash_ver_.size()};
}
} // namespace dfly::tiering

View file

@ -20,10 +20,18 @@ namespace dfly::tiering {
// safely schedules deletes after reads and allows cancelling pending stashes
class OpManager {
public:
struct Stats {
DiskStorage::Stats disk_stats;
size_t pending_read_cnt = 0;
size_t pending_stash_cnt = 0;
};
// Two separate keyspaces are provided - one for strings, one for numeric identifiers.
// Ids can be used to track auxiliary values that don't map to real keys (like packed pages).
using EntryId = std::variant<unsigned, std::string_view>;
using OwnedEntryId = std::variant<unsigned, std::string>;
// Callback for post-read completion. Returns whether the value was modified
using ReadCallback = std::function<bool(std::string*)>;
@ -47,6 +55,8 @@ class OpManager {
// Stash value to be offloaded
std::error_code Stash(EntryId id, std::string_view value);
Stats GetStats() const;
protected:
// Report that a stash succeeded and the entry was stored at the provided segment or failed with
// given error

View file

@ -63,9 +63,13 @@ TEST_F(OpManagerTest, SimpleStashesWithReads) {
EXPECT_FALSE(Stash(i, absl::StrCat("VALUE", i, "real")));
}
EXPECT_EQ(GetStats().pending_stash_cnt, 100);
while (stashed_.size() < 100)
util::ThisFiber::SleepFor(1ms);
EXPECT_EQ(GetStats().disk_stats.allocated_bytes, 100 * kPageSize);
for (unsigned i = 0; i < 100; i++) {
EXPECT_GE(stashed_[i].offset, i > 0);
EXPECT_EQ(stashed_[i].length, 10 + (i > 9));

View file

@ -59,7 +59,7 @@ SmallBins::FilledBin SmallBins::FlushBin() {
data += value.size();
}
current_bin_bytes_ = 0; // num hashes
current_bin_bytes_ = 0;
current_bin_.erase(current_bin_.begin(), current_bin_.end());
return {id, std::move(out)};
@ -67,9 +67,13 @@ SmallBins::FilledBin SmallBins::FlushBin() {
SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment) {
auto key_list = pending_bins_.extract(id);
DCHECK_GT(key_list.mapped().size(), 0u);
auto segment_list = SmallBins::KeySegmentList{key_list.mapped().begin(), key_list.mapped().end()};
for (auto& [_, sub_segment] : segment_list)
sub_segment.offset += segment.offset;
stats_.total_stashed_entries += segment_list.size();
return segment_list;
}
@ -98,12 +102,20 @@ std::optional<SmallBins::BinId> SmallBins::Delete(std::string_view key) {
std::optional<DiskSegment> SmallBins::Delete(DiskSegment segment) {
segment = segment.FillPages();
if (auto it = stashed_bins_.find(segment.offset);
it != stashed_bins_.end() && --it->second == 0) {
stashed_bins_.erase(it);
return segment;
if (auto it = stashed_bins_.find(segment.offset); it != stashed_bins_.end()) {
stats_.total_stashed_entries--;
if (--it->second == 0) {
stashed_bins_.erase(it);
return segment;
}
}
return std::nullopt;
}
SmallBins::Stats SmallBins::GetStats() const {
return Stats{.stashed_bins_cnt = stashed_bins_.size(),
.stashed_entries_cnt = stats_.total_stashed_entries,
.current_bin_bytes = current_bin_bytes_};
}
} // namespace dfly::tiering

View file

@ -19,6 +19,12 @@ namespace dfly::tiering {
// SIMPLEST VERSION for now.
class SmallBins {
public:
struct Stats {
size_t stashed_bins_cnt = 0;
size_t stashed_entries_cnt = 0;
size_t current_bin_bytes = 0;
};
using BinId = unsigned;
// Bin filled with blob of serialized entries
@ -42,6 +48,8 @@ class SmallBins {
// Delete a stored segment. Returns page segment if it became emtpy and needs to be deleted.
std::optional<DiskSegment> Delete(DiskSegment segment);
Stats GetStats() const;
private:
// Flush current bin
FilledBin FlushBin();
@ -58,6 +66,10 @@ class SmallBins {
// Map of bins that were stashed and should be deleted when refcount reaches 0
absl::flat_hash_map<size_t /*offset*/, unsigned /* refcount*/> stashed_bins_;
struct {
size_t total_stashed_entries = 0;
} stats_;
};
}; // namespace dfly::tiering

View file

@ -21,18 +21,19 @@ async def test_tiering_simple(async_client: aioredis.Redis):
# Wait for all to be offloaded
with async_timeout.timeout(1):
info = await async_client.info("TIERED_V2")
while info["tiered_v2_total_stashes"] != 100:
info = await async_client.info("TIERED_V2")
info = await async_client.info("TIERED")
print(info)
while info["tiered_total_stashes"] != 100:
info = await async_client.info("TIERED")
await asyncio.sleep(0.1)
assert 3000 * 100 <= info["tiered_v2_allocated_bytes"] <= 4096 * 100
assert 3000 * 100 <= info["tiered_allocated_bytes"] <= 4096 * 100
# Fetch back
for key in (f"k{i}" for i in range(1, 100 + 1)):
assert len(await async_client.execute_command("GET", key)) == 3000
assert (await async_client.info("TIERED_V2"))["tiered_v2_total_fetches"] == 100
assert (await async_client.info("TIERED"))["tiered_total_fetches"] == 100
# Wait to be deleted
with async_timeout.timeout(1):
while (await async_client.info("TIERED_V2"))["tiered_v2_allocated_bytes"] > 0:
while (await async_client.info("TIERED"))["tiered_allocated_bytes"] > 0:
await asyncio.sleep(0.1)