mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
* Fixes #66. 1. Introduce `hz` flag to control the frequency of periodically running tasks. 2. Update helio dependency. * Fix CI caching for PRs.
This commit is contained in:
parent
33af405434
commit
d5f2c23922
5 changed files with 26 additions and 13 deletions
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
|
@ -43,9 +43,9 @@ jobs:
|
||||||
path: |
|
path: |
|
||||||
~/.ccache
|
~/.ccache
|
||||||
${{github.workspace}}/build/_deps
|
${{github.workspace}}/build/_deps
|
||||||
key: ${{ runner.os }}-deps-${{ github.sha }}
|
key: ${{ runner.os }}-deps-${{ github.base_ref }}-${{ github.sha }}
|
||||||
restore-keys: |
|
restore-keys: |
|
||||||
${{ runner.os }}-deps-
|
${{ runner.os }}-deps-${{ github.base_ref }}-
|
||||||
|
|
||||||
- name: Configure CMake
|
- name: Configure CMake
|
||||||
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
|
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
|
||||||
|
|
2
helio
2
helio
|
@ -1 +1 @@
|
||||||
Subproject commit 490814e3f0586e6974a462ddfe42905b71b90c3b
|
Subproject commit 439b47b13ac5260ce0ba094e8166c2170965387b
|
|
@ -21,6 +21,12 @@ extern "C" {
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
ABSL_FLAG(string, backing_prefix, "", "");
|
ABSL_FLAG(string, backing_prefix, "", "");
|
||||||
|
|
||||||
|
ABSL_FLAG(uint32_t, hz, 1000,
|
||||||
|
"Base frequency at which the server updates its expiry clock "
|
||||||
|
"and performs other background tasks. Warning: not advised to decrease in production, "
|
||||||
|
"because it can affect expiry precision for PSETEX etc.");
|
||||||
|
|
||||||
ABSL_DECLARE_FLAG(bool, cache_mode);
|
ABSL_DECLARE_FLAG(bool, cache_mode);
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
@ -28,6 +34,7 @@ namespace dfly {
|
||||||
using namespace util;
|
using namespace util;
|
||||||
namespace this_fiber = ::boost::this_fiber;
|
namespace this_fiber = ::boost::this_fiber;
|
||||||
namespace fibers = ::boost::fibers;
|
namespace fibers = ::boost::fibers;
|
||||||
|
using absl::GetFlag;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
@ -48,16 +55,18 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
|
||||||
|
|
||||||
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
|
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
|
||||||
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
|
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
|
||||||
db_slice_(pb->GetIndex(), absl::GetFlag(FLAGS_cache_mode), this) {
|
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
|
||||||
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
|
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
|
||||||
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
|
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
|
||||||
queue_.Run();
|
queue_.Run();
|
||||||
});
|
});
|
||||||
|
|
||||||
if (update_db_time) {
|
if (update_db_time) {
|
||||||
constexpr uint32_t kClockCycleMs = 1;
|
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
|
||||||
|
if (clock_cycle_ms == 0)
|
||||||
|
clock_cycle_ms = 1;
|
||||||
|
|
||||||
periodic_task_ = pb->AddPeriodic(kClockCycleMs, [this] { Heartbeat(); });
|
periodic_task_ = pb->AddPeriodic(clock_cycle_ms, [this] { Heartbeat(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp_str1 = sdsempty();
|
tmp_str1 = sdsempty();
|
||||||
|
@ -92,7 +101,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
|
||||||
CompactObj::InitThreadLocal(shard_->memory_resource());
|
CompactObj::InitThreadLocal(shard_->memory_resource());
|
||||||
SmallString::InitThreadLocal(data_heap);
|
SmallString::InitThreadLocal(data_heap);
|
||||||
|
|
||||||
string backing_prefix = absl::GetFlag(FLAGS_backing_prefix);
|
string backing_prefix = GetFlag(FLAGS_backing_prefix);
|
||||||
if (!backing_prefix.empty()) {
|
if (!backing_prefix.empty()) {
|
||||||
string fn =
|
string fn =
|
||||||
absl::StrCat(backing_prefix, "-", absl::Dec(pb->GetIndex(), absl::kZeroPad4), ".ssd");
|
absl::StrCat(backing_prefix, "-", absl::Dec(pb->GetIndex(), absl::kZeroPad4), ".ssd");
|
||||||
|
|
|
@ -33,9 +33,9 @@ extern "C" {
|
||||||
#include "server/replica.h"
|
#include "server/replica.h"
|
||||||
#include "server/script_mgr.h"
|
#include "server/script_mgr.h"
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
#include "server/version.h"
|
|
||||||
#include "server/tiered_storage.h"
|
#include "server/tiered_storage.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
|
#include "server/version.h"
|
||||||
#include "strings/human_readable.h"
|
#include "strings/human_readable.h"
|
||||||
#include "util/accept_server.h"
|
#include "util/accept_server.h"
|
||||||
#include "util/uring/uring_file.h"
|
#include "util/uring/uring_file.h"
|
||||||
|
@ -48,6 +48,7 @@ ABSL_FLAG(string, requirepass, "", "password for AUTH authentication");
|
||||||
|
|
||||||
ABSL_DECLARE_FLAG(uint32_t, port);
|
ABSL_DECLARE_FLAG(uint32_t, port);
|
||||||
ABSL_DECLARE_FLAG(bool, cache_mode);
|
ABSL_DECLARE_FLAG(bool, cache_mode);
|
||||||
|
ABSL_DECLARE_FLAG(uint32_t, hz);
|
||||||
|
|
||||||
extern "C" mi_stats_t _mi_stats_main;
|
extern "C" mi_stats_t _mi_stats_main;
|
||||||
|
|
||||||
|
@ -56,10 +57,10 @@ namespace dfly {
|
||||||
using namespace util;
|
using namespace util;
|
||||||
namespace fibers = ::boost::fibers;
|
namespace fibers = ::boost::fibers;
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
using absl::GetFlag;
|
||||||
using absl::StrCat;
|
using absl::StrCat;
|
||||||
using facade::MCReplyBuilder;
|
using facade::MCReplyBuilder;
|
||||||
using strings::HumanReadableNumBytes;
|
using strings::HumanReadableNumBytes;
|
||||||
using absl::GetFlag;
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
@ -170,7 +171,10 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
|
||||||
used_mem_peak.store(sum, memory_order_relaxed);
|
used_mem_peak.store(sum, memory_order_relaxed);
|
||||||
};
|
};
|
||||||
|
|
||||||
task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); });
|
uint32_t cache_hz = max(GetFlag(FLAGS_hz) / 10, 1u);
|
||||||
|
uint32_t period_ms = max(1u, 1000 / cache_hz);
|
||||||
|
stats_caching_task_ =
|
||||||
|
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });
|
||||||
|
|
||||||
fs::path data_folder = fs::current_path();
|
fs::path data_folder = fs::current_path();
|
||||||
const auto& dir = GetFlag(FLAGS_dir);
|
const auto& dir = GetFlag(FLAGS_dir);
|
||||||
|
@ -197,8 +201,8 @@ void ServerFamily::Shutdown() {
|
||||||
load_fiber_.join();
|
load_fiber_.join();
|
||||||
|
|
||||||
pb_task_->Await([this] {
|
pb_task_->Await([this] {
|
||||||
pb_task_->CancelPeriodic(task_10ms_);
|
pb_task_->CancelPeriodic(stats_caching_task_);
|
||||||
task_10ms_ = 0;
|
stats_caching_task_ = 0;
|
||||||
|
|
||||||
unique_lock lk(replicaof_mu_);
|
unique_lock lk(replicaof_mu_);
|
||||||
if (replica_) {
|
if (replica_) {
|
||||||
|
|
|
@ -114,7 +114,7 @@ class ServerFamily {
|
||||||
|
|
||||||
boost::fibers::fiber load_fiber_;
|
boost::fibers::fiber load_fiber_;
|
||||||
|
|
||||||
uint32_t task_10ms_ = 0;
|
uint32_t stats_caching_task_ = 0;
|
||||||
Service& service_;
|
Service& service_;
|
||||||
|
|
||||||
util::AcceptServer* acceptor_ = nullptr;
|
util::AcceptServer* acceptor_ = nullptr;
|
||||||
|
|
Loading…
Reference in a new issue