1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(transaction): Add special barrier for blocking tx (#2512)

Refactor blocking transaction code. Introduce BatonBarrier for managing atomic and exclusive wakeup notifications that don't conflict with neither expiration nor cancelling

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-02-01 23:37:36 +03:00 committed by GitHub
parent 7e875bdafe
commit 40d08a3c67
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 106 additions and 70 deletions

View file

@ -82,6 +82,14 @@ void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inl
ss->stats.tx_width_freq_arr[0]++;
}
std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
using namespace chrono;
if (tp == Transaction::time_point::max())
return os << "inf";
size_t ms = duration_cast<milliseconds>(tp - Transaction::time_point::clock::now()).count();
return os << ms << "ms";
}
} // namespace
IntentLock::Mode Transaction::LockMode() const {
@ -116,6 +124,41 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}
bool Transaction::BatonBarrierrier::IsClaimed() const {
return claimed_.load(memory_order_relaxed);
}
bool Transaction::BatonBarrierrier::TryClaim() {
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
}
void Transaction::BatonBarrierrier::Close() {
DCHECK(claimed_.load(memory_order_relaxed));
closed_.store(true, memory_order_relaxed);
ec_.notify(); // release
}
cv_status Transaction::BatonBarrierrier::Wait(time_point tp) {
auto cb = [this] { return closed_.load(memory_order_acquire); };
if (tp != time_point::max()) {
// Wait until timepoint and return immediately if we finished without a timeout
if (ec_.await_until(cb, tp) == cv_status::no_timeout)
return cv_status::no_timeout;
// We timed out and claimed the barrier, so no one will be able to claim it anymore
if (TryClaim()) {
closed_.store(true, memory_order_relaxed); // Purely formal
return cv_status::timeout;
}
// fallthrough: otherwise a modification is in progress, wait for it below
}
ec_.await(cb);
return cv_status::no_timeout;
}
/**
* @brief Construct a new Transaction:: Transaction object
*
@ -1205,42 +1248,24 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
KeyReadyChecker krc) {
DVLOG(2) << "WaitOnWatch " << DebugId();
using namespace chrono;
DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used
// Register keys on active shards blocking controllers and mark shard state as suspended.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = wkeys_provider(t, shard);
return t->WatchInShard(keys, shard, krc);
};
Execute(std::move(cb), true);
coordinator_state_ |= COORD_BLOCKED;
auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
wakeup_requested_.load(memory_order_relaxed) > 0;
};
auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();
if (DCHECK_IS_ON()) {
int64_t ms = -1;
if (tp != time_point::max())
ms = duration_cast<milliseconds>(tp - time_point::clock::now()).count();
DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId();
}
cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) {
blocking_ec_.await(std::move(wake_cb));
} else {
status = blocking_ec_.await_until(std::move(wake_cb), tp);
}
// Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us.
cv_status status = blocking_barrier_.Wait(tp);
DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
--stats->num_blocked_clients;
OpStatus result = OpStatus::OK;
@ -1250,38 +1275,32 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
result = local_result_;
}
// If we don't follow up with an "action" hop, we must clean up manually on all shards.
if (result != OpStatus::OK)
ExpireBlocking(wkeys_provider);
coordinator_state_ &= ~COORD_BLOCKED;
return result;
}
// Runs only in the shard thread.
OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[SidToId(shard->shard_id())];
auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
auto* bc = shard->EnsureBlockingController();
bc->AddWatched(keys, std::move(krc), this);
sd.local_mask |= SUSPENDED_Q;
sd.local_mask &= ~OUT_OF_ORDER;
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
<< ", first_key:" << keys.front();
shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this);
DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.front();
return OpStatus::OK;
}
void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Blocking transactions don't release keys when suspending, release them now.
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(LockMode(), lock_args);
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
auto& sd = shard_data_[SidToId(shard->shard_id())];
sd.local_mask &= ~KEYLOCK_ACQUIRED;
shard->blocking_controller()->FinalizeWatched(wkeys, this);
@ -1361,40 +1380,32 @@ bool Transaction::IsGlobal() const {
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) {
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;
if (local_mask & Transaction::EXPIRED_Q) {
return false;
}
// Wake a transaction only once on the first notify.
// We don't care about preserving the strict order with multiple operations running on blocking
// keys in parallel, because the internal order is not observable from outside either way.
if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0)
if (!blocking_barrier_.TryClaim())
return false;
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
<< committed_txid;
auto& sd = shard_data_[SidToId(sid)];
// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.
if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q);
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << sd.local_mask
<< " by commited_id " << committed_txid;
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
// We're the first and only to wake this transaction, expect the shard to be suspended
CHECK(sd.local_mask & SUSPENDED_Q);
CHECK_EQ(sd.local_mask & AWAKED_Q, 0);
// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it =
find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();
}
// Find index of awakened key
auto args = GetShardArgs(sid);
auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
CHECK(it != args.end());
blocking_ec_.notify();
// Change state to awaked and store index of awakened key
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
sd.wake_key_pos = it - args.begin();
blocking_barrier_.Close();
return true;
}
@ -1474,7 +1485,10 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
}
void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if ((coordinator_state_ & COORD_BLOCKED) == 0)
// We're on the owning thread of this transaction, so we can safely access it's data below.
// We still need to claim the blocking barrier, but as this function is often called blindly, we
// want to check first if it makes sense to even proceed.
if (blocking_barrier_.IsClaimed())
return;
OpStatus status = OpStatus::CANCELLED;
@ -1490,9 +1504,13 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if (status == OpStatus::OK)
return;
// Check if someone else is about to wake us up
if (!blocking_barrier_.TryClaim())
return;
coordinator_state_ |= COORD_CANCELLED;
local_result_ = status;
blocking_ec_.notify();
blocking_barrier_.Close();
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {

View file

@ -162,8 +162,7 @@ class Transaction {
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped
UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
};
public:
@ -412,8 +411,7 @@ class Transaction {
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 2,
COORD_CANCELLED = 1 << 3,
COORD_CANCELLED = 1 << 2,
};
// Auxiliary structure used during initialization
@ -443,6 +441,25 @@ class Transaction {
EventCount ec_{};
};
// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
// will succeed and will be allowed to modify the guarded object until it closes the barrier.
// A closed barrier can't be claimed again or re-used in any way.
class BatonBarrierrier {
public:
bool IsClaimed() const; // Return if barrier is claimed, only for peeking
bool TryClaim(); // Return if the barrier was claimed successfully
void Close(); // Close barrier after it was claimed
// Wait for barrier until time_point, or indefinitely if time_point::max() was passed.
// After Wait returns, the barrier is guaranteed to be closed, including expiration.
std::cv_status Wait(time_point);
private:
std::atomic_bool claimed_{false};
std::atomic_bool closed_{false};
EventCount ec_{};
};
private:
// Init basic fields and reset re-usable.
void InitBase(DbIndex dbid, CmdArgList args);
@ -484,6 +501,7 @@ class Transaction {
// Optimized version of RunInShard for single shard uncontended cases.
RunnableResult RunQuickie(EngineShard* shard);
// Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier
void ExecuteAsync();
// Adds itself to watched queue in the shard. Must run in that shard thread.
@ -592,8 +610,8 @@ class Transaction {
ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1
UniqueSlotChecker unique_slot_checker_;
std::atomic_uint32_t wakeup_requested_{0}; // incremented when blocking transaction gets notified
EventCount blocking_ec_; // to wait for wakeup_requested > 0 (or cancelled)
// Barrier for waking blocking transactions that ensures exclusivity of waking operation.
BatonBarrierrier blocking_barrier_{};
// Transaction coordinator state, written and read by coordinator thread.
uint8_t coordinator_state_ = 0;