From 8622c27ce14213a1cf6cf8a2963967b040294011 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 1 Aug 2024 21:27:15 +0300 Subject: [PATCH] chore: expose metric that shows how many task submitters are blocked (#3427) * chore: expose metric that shows how many task submitters are blocked This should help us in identifying deadlocks quickly. --------- Signed-off-by: Roman Gershman --- src/core/CMakeLists.txt | 2 +- src/core/task_queue.cc | 11 +++++++++++ src/core/task_queue.h | 28 +++++++++++++++++++++++++--- src/server/server_family.cc | 2 ++ src/server/server_family.h | 1 + src/server/tiered_storage_test.cc | 2 +- 6 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 src/core/task_queue.cc diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 684b2a4cc..dccc1065b 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -5,7 +5,7 @@ set(SEARCH_LIB query_parser) add_library(dfly_core bloom.cc compact_object.cc dragonfly_core.cc extent_tree.cc interpreter.cc mi_memory_resource.cc sds_utils.cc - segment_allocator.cc score_map.cc small_string.cc sorted_map.cc + segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc tx_queue.cc dense_set.cc allocation_tracker.cc string_set.cc string_map.cc detail/bitpacking.cc) diff --git a/src/core/task_queue.cc b/src/core/task_queue.cc new file mode 100644 index 000000000..209ff5ffd --- /dev/null +++ b/src/core/task_queue.cc @@ -0,0 +1,11 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/task_queue.h" + +namespace dfly { + +__thread unsigned TaskQueue::blocked_submitters_ = 0; + +} // namespace dfly diff --git a/src/core/task_queue.h b/src/core/task_queue.h index dac830493..7b223bc98 100644 --- a/src/core/task_queue.h +++ b/src/core/task_queue.h @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2024, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -23,11 +23,28 @@ class TaskQueue { } template bool Add(F&& f) { - return queue_.Add(std::forward(f)); + if (queue_.TryAdd(std::forward(f))) + return true; + + ++blocked_submitters_; + auto res = queue_.Add(std::forward(f)); + --blocked_submitters_; + return res; } template auto Await(F&& f) -> decltype(f()) { - return queue_.Await(std::forward(f)); + util::fb2::Done done; + using ResultType = decltype(f()); + util::detail::ResultMover mover; + + ++blocked_submitters_; + Add([&mover, f = std::forward(f), done]() mutable { + mover.Apply(f); + done.Notify(); + }); + --blocked_submitters_; + done.Wait(); + return std::move(mover).get(); } /** @@ -47,9 +64,14 @@ class TaskQueue { consumer_fiber_.JoinIfNeeded(); } + static unsigned blocked_submitters() { + return blocked_submitters_; + } + private: util::fb2::FiberQueue queue_; util::fb2::Fiber consumer_fiber_; + static __thread unsigned blocked_submitters_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index e28397204..b8232f44f 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1158,6 +1158,7 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("fibers_count", "", m.worker_fiber_count, MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("blocked_tasks", "", m.blocked_tasks, MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("memory_max_bytes", "", max_memory_limit, MetricType::GAUGE, &resp->body()); @@ -1937,6 +1938,7 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.fiber_longrun_usec += fb2::FiberLongRunSumUsec(); result.worker_fiber_stack_size += fb2::WorkerFibersStackSize(); result.worker_fiber_count += fb2::WorkerFibersCount(); + result.blocked_tasks += TaskQueue::blocked_submitters(); result.coordinator_stats.Add(ss->stats); diff --git a/src/server/server_family.h b/src/server/server_family.h index fd7fb1420..f32ad8737 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -107,6 +107,7 @@ struct Metrics { // Max length of the all the tx shard-queues. uint32_t tx_queue_len = 0; uint32_t worker_fiber_count = 0; + uint32_t blocked_tasks = 0; size_t worker_fiber_stack_size = 0; InterpreterManager::Stats lua_stats; diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index cf71d4044..896ab8587 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -315,7 +315,7 @@ TEST_F(TieredStorageTest, MemoryPressure) { ASSERT_FALSE(true) << i << "\nInfo ALL:\n" << resp.GetString(); } // TODO: to remove it once used_mem_current is updated frequently. - ThisFiber::SleepFor(10us); + ThisFiber::SleepFor(100us); } EXPECT_LT(used_mem_peak.load(), 20_MB);