diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 04d1618e2..ef6cf8a3f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -91,6 +91,15 @@ bool CheckLocks(const DbSlice& db_slice, IntentLock::Mode mode, const KeyLockArg return true; } +struct ScheduleContext { + Transaction* trans; + bool optimistic_execution = false; + std::atomic_uint32_t fail_cnt{0}; + + ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) { + } +}; + } // namespace bool Transaction::BatonBarrier::IsClaimed() const { @@ -703,35 +712,39 @@ void Transaction::ScheduleInternal() { InitTxTime(); - atomic_uint32_t schedule_fails = 0; - auto cb = [this, &schedule_fails, can_run_immediately]() { - if (!ScheduleInShard(EngineShard::tlocal(), can_run_immediately)) { - schedule_fails.fetch_add(1, memory_order_relaxed); - } - FinishHop(); - }; - run_barrier_.Start(unique_shard_cnt_); + if (CanRunInlined()) { + // We increase the barrier above for this branch as well, in order to calm the DCHECKs + // in the lower-level code. It's not really needed otherwise because we run inline. + // single shard schedule operation can't fail CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately)); run_barrier_.Dec(); - } else { - IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); - - // Add this debugging function to print more information when we experience deadlock - // during tests. - ThisFiber::PrintLocalsCallback locals([&] { - return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, - " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); - }); - run_barrier_.Wait(); + break; } - if (schedule_fails.load(memory_order_relaxed) == 0) { - coordinator_state_ |= COORD_SCHED; + ScheduleContext schedule_ctx{this, can_run_immediately}; - RecordTxScheduleStats(this); + auto cb = [&schedule_ctx]() { + if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(), + schedule_ctx.optimistic_execution)) { + schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed); + } + schedule_ctx.trans->FinishHop(); + }; + + IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); + + // Add this debugging function to print more information when we experience deadlock + // during tests. + ThisFiber::PrintLocalsCallback locals([&] { + return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, + " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); + }); + run_barrier_.Wait(); + + if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) { break; } @@ -762,6 +775,9 @@ void Transaction::ScheduleInternal() { }); } } + + coordinator_state_ |= COORD_SCHED; + RecordTxScheduleStats(this); } // Runs in the coordinator fiber.