1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: ScheduleInternal refactoring (#3794)

A small refactoring to improve the flow of ScheduleInternal() as well as
to prepare it for the next change that will reduce the CPU load from the shard queue.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-26 08:14:26 +03:00 committed by GitHub
parent a5d34adc4c
commit 70ad113e4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -91,6 +91,15 @@ bool CheckLocks(const DbSlice& db_slice, IntentLock::Mode mode, const KeyLockArg
return true; return true;
} }
struct ScheduleContext {
Transaction* trans;
bool optimistic_execution = false;
std::atomic_uint32_t fail_cnt{0};
ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) {
}
};
} // namespace } // namespace
bool Transaction::BatonBarrier::IsClaimed() const { bool Transaction::BatonBarrier::IsClaimed() const {
@ -703,35 +712,39 @@ void Transaction::ScheduleInternal() {
InitTxTime(); InitTxTime();
atomic_uint32_t schedule_fails = 0;
auto cb = [this, &schedule_fails, can_run_immediately]() {
if (!ScheduleInShard(EngineShard::tlocal(), can_run_immediately)) {
schedule_fails.fetch_add(1, memory_order_relaxed);
}
FinishHop();
};
run_barrier_.Start(unique_shard_cnt_); run_barrier_.Start(unique_shard_cnt_);
if (CanRunInlined()) { if (CanRunInlined()) {
// We increase the barrier above for this branch as well, in order to calm the DCHECKs
// in the lower-level code. It's not really needed otherwise because we run inline.
// single shard schedule operation can't fail // single shard schedule operation can't fail
CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately)); CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately));
run_barrier_.Dec(); run_barrier_.Dec();
} else { break;
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
run_barrier_.Wait();
} }
if (schedule_fails.load(memory_order_relaxed) == 0) { ScheduleContext schedule_ctx{this, can_run_immediately};
coordinator_state_ |= COORD_SCHED;
RecordTxScheduleStats(this); auto cb = [&schedule_ctx]() {
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
schedule_ctx.optimistic_execution)) {
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
}
schedule_ctx.trans->FinishHop();
};
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
run_barrier_.Wait();
if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) {
break; break;
} }
@ -762,6 +775,9 @@ void Transaction::ScheduleInternal() {
}); });
} }
} }
coordinator_state_ |= COORD_SCHED;
RecordTxScheduleStats(this);
} }
// Runs in the coordinator fiber. // Runs in the coordinator fiber.