1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat: yield when serialization is in progress (#3220)

* allow preemption when we serialize buckets
* add condition variable to protect interleaved preemptions

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-07-11 16:55:53 +03:00 committed by GitHub
parent f9ded47c3d
commit bf2e5fd3f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 63 additions and 23 deletions

View file

@ -359,4 +359,27 @@ class UniquePicksGenerator : public PicksGenerator {
absl::BitGen bitgen_{}; absl::BitGen bitgen_{};
}; };
struct ConditionFlag {
util::fb2::CondVarAny cond_var;
bool flag = false;
};
// Helper class used to guarantee atomicity between serialization of buckets
class ConditionGuard {
public:
explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) {
util::fb2::NoOpLock noop_lk_;
enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; });
enclosing_->flag = true;
}
~ConditionGuard() {
enclosing_->flag = false;
enclosing_->cond_var.notify_one();
}
private:
ConditionFlag* enclosing_;
};
} // namespace dfly } // namespace dfly

View file

@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved. // Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
@ -9,6 +9,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "util/fibers/synchronization.h"
using namespace facade; using namespace facade;
@ -211,6 +212,8 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
bool written = false; bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
ConditionGuard guard(&bucket_ser_);
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_); DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) { if (WriteBucket(it)) {
@ -280,10 +283,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
} }
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
// Can't switch fibers because that could invalidate iterator or cause bucket splits which may
// move keys between buckets.
FiberAtomicGuard fg;
bool written = false; bool written = false;
if (it.GetVersion() < snapshot_version_) { if (it.GetVersion() < snapshot_version_) {
@ -312,7 +311,8 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";
FiberAtomicGuard fg; ConditionGuard guard(&bucket_ser_);
PrimeTable* table = db_slice_->GetTables(0).first; PrimeTable* table = db_slice_->GetTables(0).first;
if (const PrimeTable::bucket_iterator* bit = req.update()) { if (const PrimeTable::bucket_iterator* bit = req.update()) {

View file

@ -1,9 +1,10 @@
// Copyright 2022, DragonflyDB authors. All rights reserved. // Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
#pragma once #pragma once
#include "server/common.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "server/journal/journal.h" #include "server/journal/journal.h"
#include "server/journal/serializer.h" #include "server/journal/serializer.h"
@ -102,6 +103,8 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_; cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false; bool fiber_cancelled_ = false;
bool snapshot_finished_ = false; bool snapshot_finished_ = false;
ConditionFlag bucket_ser_;
}; };
} // namespace dfly } // namespace dfly

View file

@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved. // Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
@ -8,6 +8,8 @@
#include <absl/strings/match.h> #include <absl/strings/match.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <mutex>
#include "base/logging.h" #include "base/logging.h"
#include "core/heap_size.h" #include "core/heap_size.h"
#include "server/db_slice.h" #include "server/db_slice.h"
@ -16,6 +18,7 @@
#include "server/rdb_extensions.h" #include "server/rdb_extensions.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include "server/tiered_storage.h" #include "server/tiered_storage.h"
#include "util/fibers/synchronization.h"
namespace dfly { namespace dfly {
@ -235,30 +238,35 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
} }
bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
ConditionGuard guard(&bucket_ser_);
++stats_.savecb_calls; ++stats_.savecb_calls;
auto check = [&](auto v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
return false;
}
return true;
};
uint64_t v = it.GetVersion(); uint64_t v = it.GetVersion();
if (v >= snapshot_version_) { if (!check(v)) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
return false; return false;
} }
db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
snapshot_version_); snapshot_version_);
stats_.loop_serialized += SerializeBucket(current_db_, it); stats_.loop_serialized += SerializeBucket(current_db_, it);
return false; return false;
} }
unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
// Must be atomic because after after we call it.snapshot_version_ we're starting
// to send incremental updates instead of serializing the whole bucket: We must not
// send the update until the initial SerializeBucket is called.
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
// bucket.
FiberAtomicGuard fg;
DCHECK_LT(it.GetVersion(), snapshot_version_); DCHECK_LT(it.GetVersion(), snapshot_version_);
// traverse physical bucket and write it into string file. // traverse physical bucket and write it into string file.
@ -268,6 +276,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
while (!it.is_done()) { while (!it.is_done()) {
++result; ++result;
// might yield
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it; ++it;
} }
@ -330,10 +339,12 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
} }
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
FiberAtomicGuard fg; ConditionGuard guard(&bucket_ser_);
PrimeTable* table = db_slice_->GetTables(db_index).first;
if (const PrimeTable::bucket_iterator* bit = req.update()) { PrimeTable* table = db_slice_->GetTables(db_index).first;
const PrimeTable::bucket_iterator* bit = req.update();
if (bit) {
if (bit->GetVersion() < snapshot_version_) { if (bit->GetVersion() < snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit); stats_.side_saved += SerializeBucket(db_index, *bit);
} }

View file

@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved. // Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
@ -10,6 +10,7 @@
#include "base/pod_array.h" #include "base/pod_array.h"
#include "core/size_tracking_channel.h" #include "core/size_tracking_channel.h"
#include "io/file.h" #include "io/file.h"
#include "server/common.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include "server/table.h" #include "server/table.h"
@ -171,6 +172,8 @@ class SliceSnapshot {
size_t savecb_calls = 0; size_t savecb_calls = 0;
size_t keys_total = 0; size_t keys_total = 0;
} stats_; } stats_;
ConditionFlag bucket_ser_;
}; };
} // namespace dfly } // namespace dfly