1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: remove fiber guard from non atomic section (#3381)

We might preempt when we serialize a big value and the code in journal was protected by an atomic guard triggering a check failed.

* remove fiber guard from non atomic section
* move LocalBlockingCounter to common

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-07-25 16:06:35 +03:00 committed by GitHub
parent e2d65a0900
commit 4b851be57a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 31 deletions

View file

@ -15,6 +15,7 @@
#include <string_view>
#include <vector>
#include "base/logging.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "util/fibers/fibers.h"
@ -381,4 +382,28 @@ class ConditionGuard {
ConditionFlag* enclosing_;
};
class LocalBlockingCounter {
public:
void lock() {
++mutating_;
}
void unlock() {
DCHECK(mutating_ > 0);
--mutating_;
if (mutating_ == 0) {
cond_var_.notify_one();
}
}
void Wait() {
util::fb2::NoOpLock noop_lk_;
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; });
}
private:
util::fb2::CondVarAny cond_var_;
size_t mutating_ = 0;
};
} // namespace dfly

View file

@ -542,30 +542,6 @@ class DbSlice {
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;
class LocalBlockingCounter {
public:
void lock() {
++mutating_;
}
void unlock() {
DCHECK(mutating_ > 0);
--mutating_;
if (mutating_ == 0) {
cond_var_.notify_one();
}
}
void Wait() {
util::fb2::NoOpLock noop_lk_;
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; });
}
private:
util::fb2::CondVarAny cond_var_;
size_t mutating_ = 0;
};
// We need this because registered callbacks might yield. If RegisterOnChange
// gets called after we preempt while iterating over the registered callbacks
// (let's say in FlushChangeToEarlierCallbacks) we will get UB, because we pushed

View file

@ -143,11 +143,6 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const {
}
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
optional<FiberAtomicGuard> guard;
if (!await) {
guard.emplace(); // Guard is non-movable/copyable, so we must use emplace()
}
DCHECK(ring_buffer_);
JournalItem dummy;
@ -192,8 +187,11 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
<< " num callbacks: " << change_cb_arr_.size();
for (const auto& k_v : change_cb_arr_) {
k_v.second(*item, await);
const size_t size = change_cb_arr_.size();
auto k_v = change_cb_arr_.begin();
for (size_t i = 0; i < size; ++i) {
k_v->second(*item, await);
++k_v;
}
}
}