1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: separate Heartbeat and ShardHandler to fibers (#3936)

* separate shard_handler from Heartbeat
* add test

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-10-29 09:22:53 +02:00 committed by GitHub
parent f16a32582e
commit 4b495182e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 135 additions and 44 deletions

View file

@ -391,28 +391,72 @@ void EngineShard::Shutdown() {
queue_.Shutdown();
queue2_.Shutdown();
DCHECK(!fiber_periodic_.IsJoinable());
DCHECK(!fiber_heartbeat_periodic_.IsJoinable());
DCHECK(!fiber_shard_handler_periodic_.IsJoinable());
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
void EngineShard::StopPeriodicFiber() {
fiber_periodic_done_.Notify();
if (fiber_periodic_.IsJoinable()) {
fiber_periodic_.Join();
fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
}
fiber_shard_handler_periodic_done_.Notify();
if (fiber_shard_handler_periodic_.IsJoinable()) {
fiber_shard_handler_periodic_.Join();
}
}
void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler) {
static void RunFPeriodically(std::function<void()> f, std::chrono::milliseconds period_ms,
std::string_view error_msg, util::fb2::Done* waiter) {
int64_t last_heartbeat_ms = INT64_MAX;
while (true) {
if (waiter->WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}
int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) {
VLOG(1) << "This " << error_msg << " step took " << now_ms - last_heartbeat_ms << "ms";
}
f();
last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
}
}
void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;
fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(global_handler)] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
auto heartbeat = [this]() { Heartbeat(); };
std::chrono::milliseconds period_ms(clock_cycle_ms);
fiber_heartbeat_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms, heartbeat]() mutable {
ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index));
RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_);
});
}
void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb,
std::function<void()> shard_handler) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;
// Minimum 100ms
std::chrono::milliseconds period_ms(std::max((uint32_t)100, clock_cycle_ms));
fiber_shard_handler_periodic_ = MakeFiber(
[this, index = pb->GetPoolIndex(), period_ms, handler = std::move(shard_handler)]() mutable {
ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index));
RunFPeriodically(std::move(handler), period_ms, "shard handler",
&fiber_shard_handler_periodic_done_);
});
}
void EngineShard::InitThreadLocal(ProactorBase* pb) {
@ -694,32 +738,6 @@ void EngineShard::RetireExpiredAndEvict() {
}
}
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";
int64_t last_heartbeat_ms = INT64_MAX;
int64_t last_handler_ms = 0;
while (true) {
if (fiber_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}
int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) {
VLOG(1) << "This heartbeat-sleep took " << now_ms - last_heartbeat_ms << "ms";
}
Heartbeat();
last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms) {
last_handler_ms = last_heartbeat_ms;
shard_handler();
}
}
}
void EngineShard::CacheStats() {
uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs();
if (cache_stats_time_ + 1000000 > now) // 1ms

View file

@ -206,13 +206,12 @@ class EngineShard {
// blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard.
void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler);
void StartPeriodicHeartbeatFiber(util::ProactorBase* pb);
void StartPeriodicShardHandlerFiber(util::ProactorBase* pb, std::function<void()> shard_handler);
void Heartbeat();
void RetireExpiredAndEvict();
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler);
void CacheStats();
// We are running a task that checks whether we need to
@ -253,8 +252,11 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t defrag_task_ = 0;
util::fb2::Fiber fiber_periodic_;
util::fb2::Done fiber_periodic_done_;
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;
util::fb2::Fiber fiber_shard_handler_periodic_;
util::fb2::Done fiber_shard_handler_periodic_done_;
DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;

View file

@ -121,7 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {
// Must be last, as it accesses objects initialized above.
// We can not move shard_handler because this code is called multiple times.
shard->StartPeriodicFiber(pb, shard_handler);
shard->StartPeriodicHeartbeatFiber(pb);
shard->StartPeriodicShardHandlerFiber(pb, shard_handler);
}
});
}

View file

@ -114,7 +114,6 @@ class EngineShardSet {
private:
void InitThreadLocal(util::ProactorBase* pb);
util::ProactorPool* pp_;
std::unique_ptr<EngineShard*[]> shards_;
uint32_t size_ = 0;

View file

@ -2317,7 +2317,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,
await c_replica.execute_command(
"debug replica pause"
) # puase replica to trigger reconnect on master
) # pause replica to trigger reconnect on master
await asyncio.sleep(1)
@ -2629,7 +2629,56 @@ async def test_replica_of_replica(df_factory):
@pytest.mark.asyncio
async def test_replication_timeout_on_full_sync_heartbeat_expiry(
df_factory: DflyInstanceFactory, df_seeder_factory
):
# Timeout set to 3 seconds because we must first saturate the socket such that subsequent
# writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks.
master = df_factory.create(
proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2"
)
replica = df_factory.create()
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("debug", "populate", "100000", "foo", "5000")
c_master = master.client()
c_replica = replica.client()
seeder = ExpirySeeder()
seeder_task = asyncio.create_task(seeder.run(c_master))
await seeder.wait_until_n_inserts(50000)
seeder.stop()
await seeder_task
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
# wait for full sync
async with async_timeout.timeout(3):
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05)
await c_replica.execute_command("debug replica pause")
# Dragonfly would get stuck here without the bug fix. When replica does not read from the
# socket, Heartbeat() will block on the journal write for the expired items and shard_handler
# would never be called and break replication. More details on #3936.
await asyncio.sleep(6)
await c_replica.execute_command("debug replica resume") # resume replication
await asyncio.sleep(1) # replica will start resync
await check_all_replicas_finished([c_replica], c_master)
await assert_replica_reconnections(replica, 0)
@pytest.mark.slow
@pytest.mark.asyncio
async def test_big_containers(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4)

View file

@ -725,3 +725,25 @@ def assert_eventually(wrapped=None, *, times=100):
def skip_if_not_in_github():
if os.getenv("GITHUB_ACTIONS") == None:
pytest.skip("Redis server not found")
class ExpirySeeder:
def __init__(self):
self.stop_flag = False
self.i = 0
self.batch_size = 200
async def run(self, client):
while not self.stop_flag:
pipeline = client.pipeline(transaction=True)
for i in range(0, self.batch_size):
pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX 3")
self.i = self.i + 1
await pipeline.execute()
async def wait_until_n_inserts(self, count):
while not self.i > count:
await asyncio.sleep(0.5)
def stop(self):
self.stop_flag = True