1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore(transaction): Copy poll flags (#2596)

* chore(transaction): Copy poll flags

Copying poll flags prevents concurrent data access to PerShardData::local_mask when dispatching poll tasks
---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-02-15 11:55:02 +03:00 committed by GitHub
parent 28800df071
commit bbbcddfdd6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 28 additions and 14 deletions

View file

@ -760,12 +760,9 @@ void Transaction::ScheduleInternal() {
// We do not need to wait for this callback to finish - just make sure it will eventually run.
// See https://github.com/dragonflydb/dragonfly/issues/150 for more info.
if (should_poll_execution.load(memory_order_relaxed)) {
for (uint32_t i = 0; i < shard_set->size(); ++i) {
if (!is_active(i))
continue;
IterateActiveShards([](const auto& sd, auto i) {
shard_set->Add(i, [] { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); });
}
});
}
}
}
@ -871,6 +868,8 @@ void Transaction::UnlockMulti() {
ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
DCHECK_EQ(shard_data_.size(), shard_set->size());
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() {
this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt);
@ -931,12 +930,18 @@ void Transaction::ExecuteAsync() {
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
DCHECK_LE(shard_data_.size(), 1024u);
// Set armed flags on all active shards
IterateActiveShards([](auto& sd, auto i) { sd.local_mask |= ARMED; });
// Set armed flags on all active shards. Copy indices for dispatching poll tasks,
// because local_mask can be written concurrently after starting a new phase.
std::bitset<1024> poll_flags(0);
IterateActiveShards([&poll_flags](auto& sd, auto i) {
sd.local_mask |= ARMED;
poll_flags.set(i, true);
});
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
// and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end!
// and thus picked by a foreign thread's PollExecution(). Careful with data access!
run_barrier_.Start(unique_shard_cnt_);
auto* ss = ServerState::tlocal();
@ -954,7 +959,10 @@ void Transaction::ExecuteAsync() {
DVLOG(3) << "ptr_release " << DebugId();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};
IterateActiveShards([&poll_cb](PerShardData& sd, auto i) { shard_set->Add(i, poll_cb); });
IterateShards([&poll_cb, &poll_flags](PerShardData& sd, auto i) {
if (poll_flags.test(i))
shard_set->Add(i, poll_cb);
});
}
void Transaction::Conclude() {

View file

@ -541,19 +541,25 @@ class Transaction {
return sid < shard_data_.size() ? sid : 0;
}
// Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones.
template <typename F> void IterateActiveShards(F&& f) {
// Iterate over all available shards, run functor accepting (PerShardData&, ShardId)
template <typename F> void IterateShards(F&& f) {
if (unique_shard_cnt_ == 1) {
f(shard_data_[SidToId(unique_shard_id_)], unique_shard_id_);
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
if (auto& sd = shard_data_[i]; sd.local_mask & ACTIVE) {
f(sd, i);
}
f(shard_data_[i], i);
}
}
}
// Iterate over ACTIVE shards, run functor accepting (PerShardData&, ShardId)
template <typename F> void IterateActiveShards(F&& f) {
IterateShards([&f](auto& sd, auto i) {
if (sd.local_mask & ACTIVE)
f(sd, i);
});
}
private:
// Main synchronization point for dispatching hop callbacks and waiting for them to finish.
// After scheduling, sequential hops are executed as follows: