diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 8b4c21d73..ddfa4eb37 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -124,7 +124,11 @@ GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path, } error_code RdbSnapshot::SaveBody() { - return saver_->SaveBody(&cntx_, &freq_map_); + return saver_->SaveBody(&cntx_); +} + +error_code RdbSnapshot::WaitSnapshotInShard(EngineShard* shard) { + return saver_->WaitSnapshotInShard(shard); } size_t RdbSnapshot::GetSaveBuffersSize() { @@ -132,6 +136,10 @@ size_t RdbSnapshot::GetSaveBuffersSize() { return saver_->GetTotalBuffersSize(); } +void RdbSnapshot::FillFreqMap() { + saver_->FillFreqMap(&freq_map_); +} + RdbSaver::SnapshotStats RdbSnapshot::GetCurrentSnapshotProgress() const { CHECK(saver_); return saver_->GetCurrentSnapshotProgress(); @@ -147,7 +155,7 @@ error_code RdbSnapshot::Close() { } void RdbSnapshot::StartInShard(EngineShard* shard) { - saver_->StartSnapshotInShard(false, cntx_.GetCancellation(), shard); + saver_->StartSnapshotInShard(false, &cntx_, shard); started_shards_.fetch_add(1, memory_order_relaxed); } @@ -176,7 +184,12 @@ std::optional SaveStagesController::InitResourcesAndStart() { } void SaveStagesController::WaitAllSnapshots() { - RunStage(&SaveStagesController::SaveCb); + if (use_dfs_format_) { + shard_set->RunBlockingInParallel([&](EngineShard* shard) { WaitSnapshotInShard(shard); }); + SaveBody(shard_set->size()); + } else { + SaveBody(0); + } } SaveInfo SaveStagesController::Finalize() { @@ -395,13 +408,22 @@ GenericError SaveStagesController::BuildFullPath() { return {}; } -void SaveStagesController::SaveCb(unsigned index) { - if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted()) +void SaveStagesController::SaveBody(unsigned index) { + CHECK(!use_dfs_format_ || index == shard_set->size()); // used in rdb and df summary file + if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted()) { shared_err_ = snapshot->SaveBody(); + } +} + +void SaveStagesController::WaitSnapshotInShard(EngineShard* shard) { + if (auto& snapshot = snapshots_[shard->shard_id()].first; snapshot && snapshot->HasStarted()) { + shared_err_ = snapshot->WaitSnapshotInShard(shard); + } } void SaveStagesController::CloseCb(unsigned index) { if (auto& snapshot = snapshots_[index].first; snapshot) { + snapshot->FillFreqMap(); shared_err_ = snapshot->Close(); unique_lock lk{rdb_name_map_mu_}; diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 0b9db29c4..7446dffc3 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -46,6 +46,8 @@ class RdbSnapshot { void StartInShard(EngineShard* shard); error_code SaveBody(); + error_code WaitSnapshotInShard(EngineShard* shard); + void FillFreqMap(); error_code Close(); size_t GetSaveBuffersSize(); @@ -101,6 +103,8 @@ struct SaveStagesController : public SaveStagesInputs { // Start saving a dfs file on shard void SaveDfsSingle(EngineShard* shard); + void SaveSnashot(EngineShard* shard); + void WaitSnapshotInShard(EngineShard* shard); // Save a single rdb file void SaveRdb(); @@ -115,7 +119,7 @@ struct SaveStagesController : public SaveStagesInputs { // Build full path: get dir, try creating dirs, get filename with placeholder GenericError BuildFullPath(); - void SaveCb(unsigned index); + void SaveBody(unsigned index); void CloseCb(unsigned index); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index f76bd8413..0d6251d3c 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -121,7 +121,6 @@ void DflyCmd::ReplicaInfo::Cancel() { flow->cleanup(); } VLOG(2) << "After flow cleanup " << shard->shard_id(); - flow->full_sync_fb.JoinIfNeeded(); flow->conn = nullptr; }); // Wait for error handler to quit. @@ -371,7 +370,7 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - StopFullSyncInThread(flow, shard); + StopFullSyncInThread(flow, &replica_ptr->cntx, shard); status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard); }; shard_set->RunBlockingInParallel(std::move(cb)); @@ -551,7 +550,6 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn } OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { - DCHECK(!flow->full_sync_fb.IsJoinable()); DCHECK(shard); DCHECK(flow->conn); @@ -569,7 +567,6 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha // callbacks are blocked on trying to insert to channel. flow->TryShutdownSocket(); flow->saver->CancelInShard(shard); // stops writing to journal stream to channel - flow->full_sync_fb.JoinIfNeeded(); // finishes poping data from channel flow->saver.reset(); }; @@ -588,18 +585,24 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha if (flow->start_partial_sync_at.has_value()) saver->StartIncrementalSnapshotInShard(cntx, shard, *flow->start_partial_sync_at); else - saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); + saver->StartSnapshotInShard(true, cntx, shard); - flow->full_sync_fb = fb2::Fiber("full_sync", &DflyCmd::FullSyncFb, this, flow, cntx); return OpStatus::OK; } -void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { +void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); - flow->saver->StopFullSyncInShard(shard); + error_code ec = flow->saver->StopFullSyncInShard(shard); + if (ec) { + cntx->ReportError(ec); + return; + } - // Wait for full sync to finish. - flow->full_sync_fb.JoinIfNeeded(); + ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); + if (ec) { + cntx->ReportError(ec); + return; + } // Reset cleanup and saver flow->cleanup = []() {}; @@ -626,23 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS return OpStatus::OK; } -void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) { - error_code ec; - - if (ec = flow->saver->SaveBody(cntx, nullptr); ec) { - if (!flow->conn->socket()->IsOpen()) - ec = make_error_code(errc::operation_canceled); // we cancelled the operation. - cntx->ReportError(ec); - return; - } - - ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); - if (ec) { - cntx->ReportError(ec); - return; - } -} - auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair { util::fb2::LockGuard lk(mu_); unsigned sync_id = next_sync_id_++; diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index d3bf07aa3..f67d7fee1 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -39,7 +39,6 @@ struct FlowInfo { facade::Connection* conn = nullptr; - util::fb2::Fiber full_sync_fb; // Full sync fiber. std::unique_ptr saver; // Saver for full sync phase. std::unique_ptr streamer; // Streamer for stable sync phase std::string eof_token; @@ -210,14 +209,11 @@ class DflyCmd { facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Stop full sync in thread. Run state switch cleanup. - void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard); + void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Start stable sync in thread. Called for each flow. facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); - // Fiber that runs full sync for each flow. - void FullSyncFb(FlowInfo* flow, Context* cntx); - // Get ReplicaInfo by sync_id. std::shared_ptr GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 8718020b5..32257d621 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -29,6 +29,7 @@ extern "C" { #include "base/logging.h" #include "core/bloom.h" #include "core/json/json_object.h" +#include "core/size_tracking_channel.h" #include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" @@ -834,6 +835,20 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { return error_code{}; } +error_code RdbSerializer::SendEofAndChecksum() { + VLOG(2) << "SendEof"; + /* EOF opcode */ + RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_EOF)); + + /* CRC64 checksum. It will be zero if checksum computation is disabled, the + * loading code skips the check in this case. */ + uint8_t buf[8]; + uint64_t chksum = 0; + + absl::little_endian::Store64(buf, chksum); + return WriteRaw(buf); +} + error_code RdbSerializer::SendJournalOffset(uint64_t journal_offset) { VLOG(2) << "SendJournalOffset"; RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_OFFSET)); @@ -1107,11 +1122,13 @@ class RdbSaver::Impl { ~Impl(); - void StartSnapshotting(bool stream_journal, const Cancellation* cll, EngineShard* shard); + void StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard); void StartIncrementalSnapshotting(Context* cntx, EngineShard* shard, LSN start_lsn); void StopSnapshotting(EngineShard* shard); + void WaitForSnapshottingFinish(EngineShard* shard); + // used only for legacy rdb save flows. error_code ConsumeChannel(const Cancellation* cll); void FillFreqMap(RdbTypeFreqMap* dest) const; @@ -1143,6 +1160,8 @@ class RdbSaver::Impl { } private: + void PushSnapshotData(Context* cntx, string record); + void FinalizeSnapshotWriting(); error_code WriteRecord(io::Bytes src); unique_ptr& GetSnapshot(EngineShard* shard); @@ -1152,7 +1171,8 @@ class RdbSaver::Impl { vector> shard_snapshots_; // used for serializing non-body components in the calling fiber. RdbSerializer meta_serializer_; - SliceSnapshot::RecordChannel channel_; + using RecordChannel = SizeTrackingChannel>; + std::optional channel_; std::optional aligned_buf_; // Single entry compression is compatible with redis rdb snapshot @@ -1170,14 +1190,14 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode shard_snapshots_(producers_len), meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression // at all in meta serializer - channel_{kChannelLen, producers_len}, compression_mode_(compression_mode) { if (align_writes) { aligned_buf_.emplace(kBufLen, sink); sink_ = &aligned_buf_.value(); } - - DCHECK(producers_len > 0 || channel_.IsClosing()); + if (sm == SaveMode::RDB) { + channel_.emplace(kChannelLen, producers_len); + } save_mode_ = sm; } @@ -1213,13 +1233,13 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { error_code io_error; - SliceSnapshot::DbRecord record; + string record; auto& stats = ServerState::tlocal()->stats; - + DCHECK(channel_.has_value()); // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. - while (channel_.Pop(record)) { + while (channel_->Pop(record)) { if (io_error || cll->IsCancelled()) continue; @@ -1227,9 +1247,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { if (cll->IsCancelled()) continue; - DVLOG(2) << "Pulled " << record.id; auto start = absl::GetCurrentTimeNanos(); - io_error = WriteRecord(io::Buffer(record.value)); + io_error = WriteRecord(io::Buffer(record)); if (io_error) { break; // from the inner TryPop loop. } @@ -1237,15 +1256,15 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { auto delta_usec = (absl::GetCurrentTimeNanos() - start) / 1'000; stats.rdb_save_usec += delta_usec; stats.rdb_save_count++; - } while ((channel_.TryPop(record))); + } while ((channel_->TryPop(record))); } // while (channel_.Pop()) for (auto& ptr : shard_snapshots_) { - ptr->Join(); + ptr->WaitSnapshotting(); } VLOG(1) << "ConsumeChannel finished " << io_error; - DCHECK(!channel_.TryPop(record)); + DCHECK(!channel_->TryPop(record)); return io_error; } @@ -1278,32 +1297,70 @@ error_code RdbSaver::Impl::WriteRecord(io::Bytes src) { return ec; } -void RdbSaver::Impl::StartSnapshotting(bool stream_journal, const Cancellation* cll, - EngineShard* shard) { +void RdbSaver::Impl::PushSnapshotData(Context* cntx, string record) { + if (cntx->IsCancelled()) { + return; + } + if (channel_) { // Rdb write to channel + channel_->Push(record); + } else { // Write directly to socket + auto ec = WriteRecord(io::Buffer(record)); + if (ec) { + cntx->ReportError(ec); + } + } +} + +void RdbSaver::Impl::FinalizeSnapshotWriting() { + if (channel_) { + channel_->StartClosing(); + } +} + +void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard) { auto& s = GetSnapshot(shard); auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); - s = std::make_unique(&db_slice, &channel_, compression_mode_); + auto on_snapshot_finish = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this); + auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1); + + s = std::make_unique(&db_slice, compression_mode_, push_cb, on_snapshot_finish); const auto allow_flush = (save_mode_ != SaveMode::RDB) ? SliceSnapshot::SnapshotFlush::kAllow : SliceSnapshot::SnapshotFlush::kDisallow; - s->Start(stream_journal, cll, allow_flush); + + s->Start(stream_journal, cntx->GetCancellation(), allow_flush); } void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* shard, LSN start_lsn) { auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); auto& s = GetSnapshot(shard); - s = std::make_unique(&db_slice, &channel_, compression_mode_); + auto on_finalize_cb = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this); + auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1); + s = std::make_unique(&db_slice, compression_mode_, push_cb, on_finalize_cb); s->StartIncremental(cntx, start_lsn); } +// called on save flow +void RdbSaver::Impl::WaitForSnapshottingFinish(EngineShard* shard) { + auto& snapshot = GetSnapshot(shard); + CHECK(snapshot); + snapshot->WaitSnapshotting(); +} + +// called from replication flow void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) { - GetSnapshot(shard)->FinalizeJournalStream(false); + auto& snapshot = GetSnapshot(shard); + CHECK(snapshot); + snapshot->FinalizeJournalStream(false); } void RdbSaver::Impl::CancelInShard(EngineShard* shard) { - GetSnapshot(shard)->FinalizeJournalStream(true); + auto& snapshot = GetSnapshot(shard); + if (snapshot) { // Cancel can be called before snapshotting started. + snapshot->FinalizeJournalStream(true); + } } // This function is called from connection thread when info command is invoked. @@ -1314,7 +1371,8 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const { auto cb = [this, &channel_bytes, &serializer_bytes](ShardId sid) { auto& snapshot = shard_snapshots_[sid]; - channel_bytes.fetch_add(snapshot->GetTotalChannelCapacity(), memory_order_relaxed); + if (channel_.has_value()) + channel_bytes.fetch_add(channel_->GetSize(), memory_order_relaxed); serializer_bytes.store(snapshot->GetBufferCapacity() + snapshot->GetTempBuffersSize(), memory_order_relaxed); }; @@ -1437,17 +1495,22 @@ RdbSaver::~RdbSaver() { tlocal->DecommitMemory(ServerState::kAllMemory); } -void RdbSaver::StartSnapshotInShard(bool stream_journal, const Cancellation* cll, - EngineShard* shard) { - impl_->StartSnapshotting(stream_journal, cll, shard); +void RdbSaver::StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard) { + impl_->StartSnapshotting(stream_journal, cntx, shard); } void RdbSaver::StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn) { impl_->StartIncrementalSnapshotting(cntx, shard, start_lsn); } -void RdbSaver::StopFullSyncInShard(EngineShard* shard) { +error_code RdbSaver::WaitSnapshotInShard(EngineShard* shard) { + impl_->WaitForSnapshottingFinish(shard); + return SaveEpilog(); +} + +error_code RdbSaver::StopFullSyncInShard(EngineShard* shard) { impl_->StopSnapshotting(shard); + return SaveEpilog(); } error_code RdbSaver::SaveHeader(const GlobalData& glob_state) { @@ -1459,16 +1522,14 @@ error_code RdbSaver::SaveHeader(const GlobalData& glob_state) { RETURN_ON_ERR(impl_->serializer()->WriteRaw(Bytes{reinterpret_cast(magic), sz})); RETURN_ON_ERR(SaveAux(std::move(glob_state))); - + RETURN_ON_ERR(impl_->FlushSerializer()); return error_code{}; } -error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) { +error_code RdbSaver::SaveBody(Context* cntx) { RETURN_ON_ERR(impl_->FlushSerializer()); - if (save_mode_ == SaveMode::SUMMARY) { - impl_->serializer()->SendFullSyncCut(); - } else { + if (save_mode_ == SaveMode::RDB) { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); error_code io_error = impl_->ConsumeChannel(cntx->GetCancellation()); if (io_error) { @@ -1477,16 +1538,16 @@ error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) { if (cntx->GetError()) { return cntx->GetError(); } + } else { + DCHECK(save_mode_ == SaveMode::SUMMARY); } - RETURN_ON_ERR(SaveEpilog()); + return SaveEpilog(); +} - if (freq_map) { - freq_map->clear(); - impl_->FillFreqMap(freq_map); - } - - return error_code{}; +void RdbSaver::FillFreqMap(RdbTypeFreqMap* freq_map) { + freq_map->clear(); + impl_->FillFreqMap(freq_map); } error_code RdbSaver::SaveAux(const GlobalData& glob_state) { @@ -1523,20 +1584,7 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) { } error_code RdbSaver::SaveEpilog() { - uint8_t buf[8]; - uint64_t chksum; - - auto& ser = *impl_->serializer(); - - /* EOF opcode */ - RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_EOF)); - - /* CRC64 checksum. It will be zero if checksum computation is disabled, the - * loading code skips the check in this case. */ - chksum = 0; - - absl::little_endian::Store64(buf, chksum); - RETURN_ON_ERR(ser.WriteRaw(buf)); + RETURN_ON_ERR(impl_->serializer()->SendEofAndChecksum()); RETURN_ON_ERR(impl_->FlushSerializer()); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 90ca435c7..5d889d21f 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -89,21 +89,26 @@ class RdbSaver { // Initiates the serialization in the shard's thread. // cll allows breaking in the middle. - void StartSnapshotInShard(bool stream_journal, const Cancellation* cll, EngineShard* shard); + void StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard); // Send only the incremental snapshot since start_lsn. void StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn); // Stops full-sync serialization for replication in the shard's thread. - void StopFullSyncInShard(EngineShard* shard); + std::error_code StopFullSyncInShard(EngineShard* shard); + + // Wait for snapshotting finish in shard thread. Called from save flows in shard thread. + std::error_code WaitSnapshotInShard(EngineShard* shard); // Stores auxiliary (meta) values and header_info std::error_code SaveHeader(const GlobalData& header_info); // Writes the RDB file into sink. Waits for the serialization to finish. + // Called only for save rdb flow and save df on summary file. + std::error_code SaveBody(Context* cntx); + // Fills freq_map with the histogram of rdb types. - // freq_map can optionally be null. - std::error_code SaveBody(Context* cntx, RdbTypeFreqMap* freq_map); + void FillFreqMap(RdbTypeFreqMap* freq_map); void CancelInShard(EngineShard* shard); @@ -232,6 +237,7 @@ class RdbSerializer : public SerializerBase { std::error_code SendJournalOffset(uint64_t journal_offset); size_t GetTempBufferSize() const override; + std::error_code SendEofAndChecksum(); private: // Might preempt if flush_fun_ is used diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 810cf6947..745233552 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -32,16 +32,17 @@ using facade::operator""_KB; namespace { thread_local absl::flat_hash_set tl_slice_snapshots; -constexpr size_t kMinChannelBlobSize = 32_KB; +constexpr size_t kMinBlobSize = 32_KB; } // namespace -size_t SliceSnapshot::DbRecord::size() const { - return HeapSize(value); -} - -SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode) - : db_slice_(slice), dest_(dest), compression_mode_(compression_mode) { +SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode, + std::function on_push_record, + std::function on_snapshot_finish) + : db_slice_(slice), + compression_mode_(compression_mode), + on_push_(on_push_record), + on_snapshot_finish_(on_snapshot_finish) { db_array_ = slice->databases(); tl_slice_snapshots.insert(this); } @@ -53,7 +54,7 @@ SliceSnapshot::~SliceSnapshot() { size_t SliceSnapshot::GetThreadLocalMemoryUsage() { size_t mem = 0; for (SliceSnapshot* snapshot : tl_slice_snapshots) { - mem += snapshot->GetBufferCapacity() + snapshot->GetTotalChannelCapacity(); + mem += snapshot->GetBufferCapacity(); } return mem; } @@ -81,8 +82,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot flush_fun = [this, flush_threshold](size_t bytes_serialized, RdbSerializer::FlushState flush_state) { if (bytes_serialized > flush_threshold) { - size_t serialized = FlushChannelRecord(flush_state); - VLOG(2) << "FlushedToChannel " << serialized << " bytes"; + size_t serialized = FlushSerialized(flush_state); + VLOG(2) << "FlushSerialized " << serialized << " bytes"; } }; } @@ -93,11 +94,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] { IterateBucketsFb(cll, stream_journal); db_slice_->UnregisterOnChange(snapshot_version_); - // We stop the channel if we are performing backups (non-streaming). - // We keep the channel for replication in order to send jounal changes until we finalize. - if (!stream_journal) { - CloseRecordChannel(); - } + on_snapshot_finish_(); }); } @@ -127,10 +124,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { journal->UnregisterOnChange(cb_id); if (!cancel) { serializer_->SendJournalOffset(journal->GetLsn()); - PushSerializedToChannel(true); + PushSerialized(true); } - - CloseRecordChannel(); } // The algorithm is to go over all the buckets and serialize those with @@ -175,7 +170,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn PrimeTable::Cursor next = db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this)); cursor = next; - PushSerializedToChannel(false); + PushSerialized(false); if (stats_.loop_serialized >= last_yield + 100) { DVLOG(2) << "Before sleep " << ThisFiber::GetName(); @@ -185,18 +180,18 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn last_yield = stats_.loop_serialized; // Push in case other fibers (writes commands that pushed previous values) // filled the buffer. - PushSerializedToChannel(false); + PushSerialized(false); } } while (cursor); DVLOG(2) << "after loop " << ThisFiber::GetName(); - PushSerializedToChannel(true); + PushSerialized(true); } // for (dbindex) CHECK(!serialize_bucket_running_); if (send_full_sync_cut) { CHECK(!serializer_->SendFullSyncCut()); - PushSerializedToChannel(true); + PushSerialized(true); } // serialized + side_saved must be equal to the total saved. @@ -214,7 +209,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { // The replica sends the LSN of the next entry is wants to receive. while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { serializer_->WriteJournalEntry(journal->GetEntry(lsn)); - PushSerializedToChannel(false); + PushSerialized(false); lsn++; } @@ -236,7 +231,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { } auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); - PushSerializedToChannel(true); + PushSerialized(true); } else { // We stopped but we didn't manage to send the whole stream. cntx->ReportError( @@ -321,7 +316,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr } } -size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) { +size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { io::StringFile sfile; serializer_->FlushToSink(&sfile, flush_state); @@ -331,34 +326,34 @@ size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) uint64_t id = rec_id_++; DVLOG(2) << "Pushing " << id; - DbRecord db_rec{.id = id, .value = std::move(sfile.val)}; + fb2::NoOpLock lk; // We create a critical section here that ensures that records are pushed in sequential order. - // As a result, it is not possible for two fiber producers to push into channel concurrently. + // As a result, it is not possible for two fiber producers to push concurrently. // If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4. // Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and // update last_pushed_id_ to 5. seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; }); // Blocking point. - size_t channel_usage = dest_->Push(std::move(db_rec)); + on_push_(std::move(sfile.val)); + DCHECK_EQ(last_pushed_id_ + 1, id); last_pushed_id_ = id; seq_cond_.notify_all(); - VLOG(2) << "Pushed with Serialize() " << serialized - << " bytes, channel total usage: " << channel_usage; + VLOG(2) << "Pushed with Serialize() " << serialized; return serialized; } -bool SliceSnapshot::PushSerializedToChannel(bool force) { - if (!force && serializer_->SerializedLen() < kMinChannelBlobSize) +bool SliceSnapshot::PushSerialized(bool force) { + if (!force && serializer_->SerializedLen() < kMinBlobSize) return false; // Flush any of the leftovers to avoid interleavings - size_t serialized = FlushChannelRecord(FlushState::kFlushMidEntry); + size_t serialized = FlushSerialized(FlushState::kFlushMidEntry); if (!delayed_entries_.empty()) { // Async bucket serialization might have accumulated some delayed values. @@ -371,7 +366,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { } while (!delayed_entries_.empty()); // blocking point. - serialized += FlushChannelRecord(FlushState::kFlushMidEntry); + serialized += FlushSerialized(FlushState::kFlushMidEntry); } return serialized > 0; } @@ -400,7 +395,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no - // additional journal change to serialize, it simply invokes PushSerializedToChannel. + // additional journal change to serialize, it simply invokes PushSerialized. std::unique_lock lk(db_slice_->GetSerializationMutex()); if (item.opcode != journal::Op::NOOP) { serializer_->WriteJournalEntry(item.data); @@ -409,19 +404,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) if (await) { // This is the only place that flushes in streaming mode // once the iterate buckets fiber finished. - PushSerializedToChannel(false); - } -} - -void SliceSnapshot::CloseRecordChannel() { - DCHECK(db_slice_->shard_owner()->IsMyThread()); - std::unique_lock lk(db_slice_->GetSerializationMutex()); - - CHECK(!serialize_bucket_running_); - // Make sure we close the channel only once with a CAS check. - bool expected = false; - if (closed_chan_.compare_exchange_strong(expected, true)) { - dest_->StartClosing(); + PushSerialized(false); } } @@ -433,10 +416,6 @@ size_t SliceSnapshot::GetBufferCapacity() const { return serializer_->GetBufferCapacity(); } -size_t SliceSnapshot::GetTotalChannelCapacity() const { - return dest_->GetSize(); -} - size_t SliceSnapshot::GetTempBuffersSize() const { if (serializer_ == nullptr) { return 0; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 9c5fd976c..38ad86c88 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -8,7 +8,6 @@ #include #include "base/pod_array.h" -#include "core/size_tracking_channel.h" #include "io/file.h" #include "server/common.h" #include "server/db_slice.h" @@ -31,35 +30,27 @@ struct Entry; // │ SerializeBucket │ Both might fall back to a temporary serializer // └────────────┬─────────────┘ if default is used on another db index // │ -// | Channel is left open in journal streaming mode +// | Socket is left open in journal streaming mode // ▼ // ┌──────────────────────────┐ ┌─────────────────────────┐ // │ SerializeEntry │ ◄────────┤ OnJournalEntry │ // └─────────────┬────────────┘ └─────────────────────────┘ // │ -// PushBytesToChannel Default buffer gets flushed on iteration, -// │ temporary on destruction +// PushBytes Default buffer gets flushed on iteration, +// │ temporary on destruction // ▼ // ┌──────────────────────────────┐ -// │ dest->Push(buffer) │ +// │ push_cb(buffer) │ // └──────────────────────────────┘ // SliceSnapshot is used for iterating over a shard at a specified point-in-time -// and submitting all values to an output channel. +// and submitting all values to an output sink. // In journal streaming mode, the snapshot continues submitting changes -// over the channel until explicitly stopped. +// over the sink until explicitly stopped. class SliceSnapshot { public: - struct DbRecord { - uint64_t id; - std::string value; - - size_t size() const; - }; - - using RecordChannel = SizeTrackingChannel>; - - SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode); + SliceSnapshot(DbSlice* slice, CompressionMode compression_mode, + std::function on_push, std::function on_snapshot_finish); ~SliceSnapshot(); static size_t GetThreadLocalMemoryUsage(); @@ -85,7 +76,7 @@ class SliceSnapshot { // Waits for a regular, non journal snapshot to finish. // Called only for non-replication, backups usecases. - void Join() { + void WaitSnapshotting() { snapshot_fb_.JoinIfNeeded(); } @@ -114,18 +105,15 @@ class SliceSnapshot { // Journal listener void OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg); - // Close dest channel if not closed yet. - void CloseRecordChannel(); - - // Push serializer's internal buffer to channel. + // Push serializer's internal buffer. // Push regardless of buffer size if force is true. // Return true if pushed. Can block. Is called from the snapshot thread. - bool PushSerializedToChannel(bool force); + bool PushSerialized(bool force); // Helper function that flushes the serialized items into the RecordStream. - // Can block on the channel. + // Can block. using FlushState = SerializerBase::FlushState; - size_t FlushChannelRecord(FlushState flush_state); + size_t FlushSerialized(FlushState flush_state); public: uint64_t snapshot_version() const { @@ -138,7 +126,6 @@ class SliceSnapshot { // Get different sizes, in bytes. All disjoint. size_t GetBufferCapacity() const; - size_t GetTotalChannelCapacity() const; size_t GetTempBuffersSize() const; RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; @@ -156,9 +143,6 @@ class SliceSnapshot { DbSlice* db_slice_; DbTableArray db_array_; - RecordChannel* dest_; - std::atomic_bool closed_chan_{false}; // true if dest_->StartClosing was already called - DbIndex current_db_; std::unique_ptr serializer_; @@ -184,6 +168,9 @@ class SliceSnapshot { size_t savecb_calls = 0; size_t keys_total = 0; } stats_; + + std::function on_push_; + std::function on_snapshot_finish_; }; } // namespace dfly diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index e7062f48d..622b07215 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -439,7 +439,7 @@ class TestDflySnapshotOnShutdown: @pytest.mark.parametrize("format", FILE_FORMATS) @dfly_args({**BASIC_ARGS, "dbfilename": "info-while-snapshot"}) -async def test_infomemory_while_snapshoting(df_factory, format: str): +async def test_infomemory_while_snapshotting(df_factory, format: str): instance = df_factory.create(dbfilename=f"dump_{tmp_file_name()}") instance.start() async_client = instance.client()