mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: expose metric that shows how many task submitters are blocked (#3427)
* chore: expose metric that shows how many task submitters are blocked This should help us in identifying deadlocks quickly. --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
e2b6cfb384
commit
8622c27ce1
6 changed files with 41 additions and 5 deletions
|
@ -5,7 +5,7 @@ set(SEARCH_LIB query_parser)
|
|||
|
||||
add_library(dfly_core bloom.cc compact_object.cc dragonfly_core.cc extent_tree.cc
|
||||
interpreter.cc mi_memory_resource.cc sds_utils.cc
|
||||
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc
|
||||
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
|
||||
tx_queue.cc dense_set.cc allocation_tracker.cc
|
||||
string_set.cc string_map.cc detail/bitpacking.cc)
|
||||
|
||||
|
|
11
src/core/task_queue.cc
Normal file
11
src/core/task_queue.cc
Normal file
|
@ -0,0 +1,11 @@
|
|||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "core/task_queue.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
__thread unsigned TaskQueue::blocked_submitters_ = 0;
|
||||
|
||||
} // namespace dfly
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
|
@ -23,11 +23,28 @@ class TaskQueue {
|
|||
}
|
||||
|
||||
template <typename F> bool Add(F&& f) {
|
||||
return queue_.Add(std::forward<F>(f));
|
||||
if (queue_.TryAdd(std::forward<F>(f)))
|
||||
return true;
|
||||
|
||||
++blocked_submitters_;
|
||||
auto res = queue_.Add(std::forward<F>(f));
|
||||
--blocked_submitters_;
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename F> auto Await(F&& f) -> decltype(f()) {
|
||||
return queue_.Await(std::forward<F>(f));
|
||||
util::fb2::Done done;
|
||||
using ResultType = decltype(f());
|
||||
util::detail::ResultMover<ResultType> mover;
|
||||
|
||||
++blocked_submitters_;
|
||||
Add([&mover, f = std::forward<F>(f), done]() mutable {
|
||||
mover.Apply(f);
|
||||
done.Notify();
|
||||
});
|
||||
--blocked_submitters_;
|
||||
done.Wait();
|
||||
return std::move(mover).get();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -47,9 +64,14 @@ class TaskQueue {
|
|||
consumer_fiber_.JoinIfNeeded();
|
||||
}
|
||||
|
||||
static unsigned blocked_submitters() {
|
||||
return blocked_submitters_;
|
||||
}
|
||||
|
||||
private:
|
||||
util::fb2::FiberQueue queue_;
|
||||
util::fb2::Fiber consumer_fiber_;
|
||||
static __thread unsigned blocked_submitters_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -1158,6 +1158,7 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
|
|||
MetricType::GAUGE, &resp->body());
|
||||
AppendMetricWithoutLabels("fibers_count", "", m.worker_fiber_count, MetricType::GAUGE,
|
||||
&resp->body());
|
||||
AppendMetricWithoutLabels("blocked_tasks", "", m.blocked_tasks, MetricType::GAUGE, &resp->body());
|
||||
|
||||
AppendMetricWithoutLabels("memory_max_bytes", "", max_memory_limit, MetricType::GAUGE,
|
||||
&resp->body());
|
||||
|
@ -1937,6 +1938,7 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
|
|||
result.fiber_longrun_usec += fb2::FiberLongRunSumUsec();
|
||||
result.worker_fiber_stack_size += fb2::WorkerFibersStackSize();
|
||||
result.worker_fiber_count += fb2::WorkerFibersCount();
|
||||
result.blocked_tasks += TaskQueue::blocked_submitters();
|
||||
|
||||
result.coordinator_stats.Add(ss->stats);
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ struct Metrics {
|
|||
// Max length of the all the tx shard-queues.
|
||||
uint32_t tx_queue_len = 0;
|
||||
uint32_t worker_fiber_count = 0;
|
||||
uint32_t blocked_tasks = 0;
|
||||
size_t worker_fiber_stack_size = 0;
|
||||
|
||||
InterpreterManager::Stats lua_stats;
|
||||
|
|
|
@ -315,7 +315,7 @@ TEST_F(TieredStorageTest, MemoryPressure) {
|
|||
ASSERT_FALSE(true) << i << "\nInfo ALL:\n" << resp.GetString();
|
||||
}
|
||||
// TODO: to remove it once used_mem_current is updated frequently.
|
||||
ThisFiber::SleepFor(10us);
|
||||
ThisFiber::SleepFor(100us);
|
||||
}
|
||||
|
||||
EXPECT_LT(used_mem_peak.load(), 20_MB);
|
||||
|
|
Loading…
Reference in a new issue