From bf2e5fd3f553ca0754e76a1c2af482baa0d1a36d Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 11 Jul 2024 16:55:53 +0300 Subject: [PATCH] feat: yield when serialization is in progress (#3220) * allow preemption when we serialize buckets * add condition variable to protect interleaved preemptions Signed-off-by: kostas --- src/server/common.h | 23 +++++++++++++++++++ src/server/journal/streamer.cc | 12 +++++----- src/server/journal/streamer.h | 5 ++++- src/server/snapshot.cc | 41 +++++++++++++++++++++------------- src/server/snapshot.h | 5 ++++- 5 files changed, 63 insertions(+), 23 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 3260d5c38..876d0a65a 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -359,4 +359,27 @@ class UniquePicksGenerator : public PicksGenerator { absl::BitGen bitgen_{}; }; +struct ConditionFlag { + util::fb2::CondVarAny cond_var; + bool flag = false; +}; + +// Helper class used to guarantee atomicity between serialization of buckets +class ConditionGuard { + public: + explicit ConditionGuard(ConditionFlag* enclosing) : enclosing_(enclosing) { + util::fb2::NoOpLock noop_lk_; + enclosing_->cond_var.wait(noop_lk_, [this]() { return !enclosing_->flag; }); + enclosing_->flag = true; + } + + ~ConditionGuard() { + enclosing_->flag = false; + enclosing_->cond_var.notify_one(); + } + + private: + ConditionFlag* enclosing_; +}; + } // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b6a6f98d0..6c6e00669 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2024, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -9,6 +9,7 @@ #include "base/flags.h" #include "base/logging.h" #include "server/cluster/cluster_defs.h" +#include "util/fibers/synchronization.h" using namespace facade; @@ -211,6 +212,8 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { bool written = false; cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { + ConditionGuard guard(&bucket_ser_); + db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); if (WriteBucket(it)) { @@ -280,10 +283,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { } bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { - // Can't switch fibers because that could invalidate iterator or cause bucket splits which may - // move keys between buckets. - FiberAtomicGuard fg; - bool written = false; if (it.GetVersion() < snapshot_version_) { @@ -312,7 +311,8 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; - FiberAtomicGuard fg; + ConditionGuard guard(&bucket_ser_); + PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 9cef8352b..0ba178f3b 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -1,9 +1,10 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2024, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // #pragma once +#include "server/common.h" #include "server/db_slice.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" @@ -102,6 +103,8 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; + + ConditionFlag bucket_ser_; }; } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 8a202954d..6ddeee3ff 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2024, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -8,6 +8,8 @@ #include #include +#include + #include "base/logging.h" #include "core/heap_size.h" #include "server/db_slice.h" @@ -16,6 +18,7 @@ #include "server/rdb_extensions.h" #include "server/rdb_save.h" #include "server/tiered_storage.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -235,30 +238,35 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn } bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { + ConditionGuard guard(&bucket_ser_); + ++stats_.savecb_calls; + auto check = [&](auto v) { + if (v >= snapshot_version_) { + // either has been already serialized or added after snapshotting started. + DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() + << " at " << v; + ++stats_.skipped; + return false; + } + return true; + }; + uint64_t v = it.GetVersion(); - if (v >= snapshot_version_) { - // either has been already serialized or added after snapshotting started. - DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() - << " at " << v; - ++stats_.skipped; + if (!check(v)) { return false; } + db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), snapshot_version_); stats_.loop_serialized += SerializeBucket(current_db_, it); + return false; } unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { - // Must be atomic because after after we call it.snapshot_version_ we're starting - // to send incremental updates instead of serializing the whole bucket: We must not - // send the update until the initial SerializeBucket is called. - // Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this - // bucket. - FiberAtomicGuard fg; DCHECK_LT(it.GetVersion(), snapshot_version_); // traverse physical bucket and write it into string file. @@ -268,6 +276,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite while (!it.is_done()) { ++result; + // might yield SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); ++it; } @@ -330,10 +339,12 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { - FiberAtomicGuard fg; - PrimeTable* table = db_slice_->GetTables(db_index).first; + ConditionGuard guard(&bucket_ser_); - if (const PrimeTable::bucket_iterator* bit = req.update()) { + PrimeTable* table = db_slice_->GetTables(db_index).first; + const PrimeTable::bucket_iterator* bit = req.update(); + + if (bit) { if (bit->GetVersion() < snapshot_version_) { stats_.side_saved += SerializeBucket(db_index, *bit); } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index da7962d2e..af58cf4d9 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2024, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -10,6 +10,7 @@ #include "base/pod_array.h" #include "core/size_tracking_channel.h" #include "io/file.h" +#include "server/common.h" #include "server/db_slice.h" #include "server/rdb_save.h" #include "server/table.h" @@ -171,6 +172,8 @@ class SliceSnapshot { size_t savecb_calls = 0; size_t keys_total = 0; } stats_; + + ConditionFlag bucket_ser_; }; } // namespace dfly