diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 2440b8c0a..799abd5b7 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,15 +12,16 @@ namespace dfly { class OutgoingMigration::SliceSlotMigration { public: SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, - Context* cntx) + Context* cntx, io::Sink* dest) : streamer_(slice, std::move(slots), sync_id, journal, cntx) { - } - - void Start(io::Sink* dest) { streamer_.Start(dest); state_ = MigrationState::C_FULL_SYNC; } + ~SliceSlotMigration() { + streamer_.Cancel(); + } + MigrationState GetState() const { return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished() ? MigrationState::C_STABLE_SYNC @@ -52,8 +53,7 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou std::lock_guard lck(flows_mu_); slot_migrations_[shard_id] = - std::make_unique(slice, std::move(sset), sync_id, journal, &cntx_); - slot_migrations_[shard_id]->Start(dest); + std::make_unique(slice, std::move(sset), sync_id, journal, &cntx_, dest); } MigrationState OutgoingMigration::GetState() const { diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 636de415a..d2ae854f8 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -99,12 +99,6 @@ void RestoreStreamer::Cancel() { JournalStreamer::Cancel(); } -RestoreStreamer::~RestoreStreamer() { - fiber_cancellation_.Cancel(); - snapshot_fb_.JoinIfNeeded(); - db_slice_->UnregisterOnChange(snapshot_version_); -} - bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { if (!item.slot.has_value()) { return false; @@ -122,11 +116,10 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { } void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { - DCHECK_LT(it.GetVersion(), snapshot_version_); - it.SetVersion(snapshot_version_); - bool is_data_present = false; - { + + if (it.GetVersion() < snapshot_version_) { + it.SetVersion(snapshot_version_); FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator string key_buffer; // we can reuse it for (; !it.is_done(); ++it) { @@ -157,9 +150,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { - if (bit->GetVersion() < snapshot_version_) { - WriteBucket(*bit); - } + WriteBucket(*bit); } else { string_view key = get(req.change); table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 7b184573b..dcf5a88a5 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -57,14 +57,13 @@ class RestoreStreamer : public JournalStreamer { Context* cntx); void Start(io::Sink* dest) override; + // Cancel() must be called if Start() is called void Cancel() override; bool IsSnapshotFinished() const { return snapshot_finished_; } - ~RestoreStreamer(); - private: void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override;