diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1959cf870..48e5a3a4a 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -11,6 +11,7 @@ #include #include +#include extern "C" { #include "redis/intset.h" @@ -832,10 +833,48 @@ error_code AlignedBuffer::Flush() { } class RdbSaver::Impl { + private: + // This is a helper struct to pop records from channel while enfocing returned records order. + struct RecordsPopper { + RecordsPopper(bool enforce_order, SliceSnapshot::RecordChannel* c) + : enforce_order(enforce_order), channel(c) { + } + + // Blocking function, pops from channel. + // If enforce_order is enabled return the records by order. + // returns nullopt if channel was closed. + std::optional Pop(); + + // Non blocking function, trys to pop from channel. + // If enforce_order is enabled return the records by order. + // returns nullopt if nothing in channel. + std::optional TryPop(); + + private: + std::optional InternalPop(bool blocking); + // Checks if next record is in queue, if so set record_holder and return true, otherwise + // return false. + bool TryPopFromQueue(); + + struct Compare { + bool operator()(const SliceSnapshot::DbRecord& a, const SliceSnapshot::DbRecord& b) { + return a.id > b.id; + } + }; + // min heap holds the DbRecord that poped from channel OOO + std::priority_queue, Compare> + q_records; + uint64_t next_record_id = 0; + bool enforce_order; + SliceSnapshot::RecordChannel* channel; + SliceSnapshot::DbRecord record_holder; + }; + public: // We pass K=sz to say how many producers are pushing data in order to maintain // correct closing semantics - channel is closing when K producers marked it as closed. - Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, io::Sink* sink); + Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, + SaveMode save_mode, io::Sink* sink); void StartSnapshotting(bool stream_journal, const Cancellation* cll, EngineShard* shard); @@ -875,6 +914,7 @@ class RdbSaver::Impl { // used for serializing non-body components in the calling fiber. RdbSerializer meta_serializer_; SliceSnapshot::RecordChannel channel_; + bool push_to_sink_with_order_ = false; std::optional aligned_buf_; CompressionMode compression_mode_; // Single entry compression is compatible with redis rdb snapshot @@ -885,7 +925,7 @@ class RdbSaver::Impl { // We pass K=sz to say how many producers are pushing data in order to maintain // correct closing semantics - channel is closing when K producers marked it as closed. RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, - io::Sink* sink) + SaveMode sm, io::Sink* sink) : sink_(sink), shard_snapshots_(producers_len), meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression // at all in meta serializer @@ -894,6 +934,9 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode aligned_buf_.emplace(kBufLen, sink); sink_ = &aligned_buf_.value(); } + if (sm == SaveMode::SINGLE_SHARD) { + push_to_sink_with_order_ = true; + } DCHECK(producers_len > 0 || channel_.IsClosing()); } @@ -907,17 +950,58 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) return error_code{}; } +bool RdbSaver::Impl::RecordsPopper::TryPopFromQueue() { + if (enforce_order && !q_records.empty() && q_records.top().id == next_record_id) { + record_holder = std::move(const_cast(q_records.top())); + q_records.pop(); + ++next_record_id; + return true; + } + return false; +} + +std::optional RdbSaver::Impl::RecordsPopper::Pop() { + return InternalPop(true); +} + +std::optional RdbSaver::Impl::RecordsPopper::TryPop() { + return InternalPop(false); +} + +std::optional RdbSaver::Impl::RecordsPopper::InternalPop(bool blocking) { + if (TryPopFromQueue()) { + return std::move(record_holder); + } + + auto pop_fn = + blocking ? &SliceSnapshot::RecordChannel::Pop : &SliceSnapshot::RecordChannel::TryPop; + + while ((channel->*pop_fn)(record_holder)) { + if (!enforce_order) { + return std::move(record_holder); + } + if (record_holder.id == next_record_id) { + ++next_record_id; + return std::move(record_holder); + } + // record popped from channel is ooo, push to queue + q_records.emplace(std::move(record_holder)); + } + return std::nullopt; +} + error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { error_code io_error; size_t channel_bytes = 0; - SliceSnapshot::DbRecord record; + std::optional record; + + RecordsPopper records_popper(push_to_sink_with_order_, &channel_); // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. - auto& channel = channel_; - while (channel.Pop(record)) { + while (record = records_popper.Pop()) { if (io_error || cll->IsCancelled()) continue; @@ -925,14 +1009,15 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { if (cll->IsCancelled()) continue; - DVLOG(2) << "Pulled " << record.id; - channel_bytes += record.value.size(); + DVLOG(2) << "Pulled " << record->id; + channel_bytes += record->value.size(); - io_error = sink_->Write(io::Buffer(record.value)); - - record.value.clear(); - } while (!io_error && channel.TryPop(record)); - } // while (channel.pop) + io_error = sink_->Write(io::Buffer(record->value)); + if (io_error) { + break; + } + } while (record = records_popper.TryPop()); + } // while (records_popper.Pop()) size_t pushed_bytes = 0; for (auto& ptr : shard_snapshots_) { @@ -940,7 +1025,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { pushed_bytes += ptr->channel_bytes(); } - DCHECK(!channel.TryPop(record)); + DCHECK(!channel_.TryPop(*record)); VLOG(1) << "Channel pulled bytes: " << channel_bytes << " pushed bytes: " << pushed_bytes; @@ -1026,7 +1111,7 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) { break; } VLOG(1) << "Rdb save using compression mode:" << uint32_t(compression_mode_); - impl_.reset(new Impl(align_writes, producer_count, compression_mode_, sink)); + impl_.reset(new Impl(align_writes, producer_count, compression_mode_, save_mode, sink)); save_mode_ = save_mode; }