1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

bug(snapshot) : enforce order when pushing to channel when needed issue #879 (#886)

* bug(rdb_save): snapshotting records pushed to channel can be OOO 
fixes #879

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-03-02 09:15:02 +02:00 committed by GitHub
parent ccc784d9c4
commit 15ee7224a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -11,6 +11,7 @@
#include <zstd.h> #include <zstd.h>
#include <jsoncons/json.hpp> #include <jsoncons/json.hpp>
#include <queue>
extern "C" { extern "C" {
#include "redis/intset.h" #include "redis/intset.h"
@ -832,10 +833,48 @@ error_code AlignedBuffer::Flush() {
} }
class RdbSaver::Impl { class RdbSaver::Impl {
private:
// This is a helper struct to pop records from channel while enfocing returned records order.
struct RecordsPopper {
RecordsPopper(bool enforce_order, SliceSnapshot::RecordChannel* c)
: enforce_order(enforce_order), channel(c) {
}
// Blocking function, pops from channel.
// If enforce_order is enabled return the records by order.
// returns nullopt if channel was closed.
std::optional<SliceSnapshot::DbRecord> Pop();
// Non blocking function, trys to pop from channel.
// If enforce_order is enabled return the records by order.
// returns nullopt if nothing in channel.
std::optional<SliceSnapshot::DbRecord> TryPop();
private:
std::optional<SliceSnapshot::DbRecord> InternalPop(bool blocking);
// Checks if next record is in queue, if so set record_holder and return true, otherwise
// return false.
bool TryPopFromQueue();
struct Compare {
bool operator()(const SliceSnapshot::DbRecord& a, const SliceSnapshot::DbRecord& b) {
return a.id > b.id;
}
};
// min heap holds the DbRecord that poped from channel OOO
std::priority_queue<SliceSnapshot::DbRecord, std::vector<SliceSnapshot::DbRecord>, Compare>
q_records;
uint64_t next_record_id = 0;
bool enforce_order;
SliceSnapshot::RecordChannel* channel;
SliceSnapshot::DbRecord record_holder;
};
public: public:
// We pass K=sz to say how many producers are pushing data in order to maintain // We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed. // correct closing semantics - channel is closing when K producers marked it as closed.
Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, io::Sink* sink); Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode,
SaveMode save_mode, io::Sink* sink);
void StartSnapshotting(bool stream_journal, const Cancellation* cll, EngineShard* shard); void StartSnapshotting(bool stream_journal, const Cancellation* cll, EngineShard* shard);
@ -875,6 +914,7 @@ class RdbSaver::Impl {
// used for serializing non-body components in the calling fiber. // used for serializing non-body components in the calling fiber.
RdbSerializer meta_serializer_; RdbSerializer meta_serializer_;
SliceSnapshot::RecordChannel channel_; SliceSnapshot::RecordChannel channel_;
bool push_to_sink_with_order_ = false;
std::optional<AlignedBuffer> aligned_buf_; std::optional<AlignedBuffer> aligned_buf_;
CompressionMode CompressionMode
compression_mode_; // Single entry compression is compatible with redis rdb snapshot compression_mode_; // Single entry compression is compatible with redis rdb snapshot
@ -885,7 +925,7 @@ class RdbSaver::Impl {
// We pass K=sz to say how many producers are pushing data in order to maintain // We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed. // correct closing semantics - channel is closing when K producers marked it as closed.
RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode,
io::Sink* sink) SaveMode sm, io::Sink* sink)
: sink_(sink), shard_snapshots_(producers_len), : sink_(sink), shard_snapshots_(producers_len),
meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression
// at all in meta serializer // at all in meta serializer
@ -894,6 +934,9 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode
aligned_buf_.emplace(kBufLen, sink); aligned_buf_.emplace(kBufLen, sink);
sink_ = &aligned_buf_.value(); sink_ = &aligned_buf_.value();
} }
if (sm == SaveMode::SINGLE_SHARD) {
push_to_sink_with_order_ = true;
}
DCHECK(producers_len > 0 || channel_.IsClosing()); DCHECK(producers_len > 0 || channel_.IsClosing());
} }
@ -907,17 +950,58 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
return error_code{}; return error_code{};
} }
bool RdbSaver::Impl::RecordsPopper::TryPopFromQueue() {
if (enforce_order && !q_records.empty() && q_records.top().id == next_record_id) {
record_holder = std::move(const_cast<SliceSnapshot::DbRecord&>(q_records.top()));
q_records.pop();
++next_record_id;
return true;
}
return false;
}
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::Pop() {
return InternalPop(true);
}
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::TryPop() {
return InternalPop(false);
}
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::InternalPop(bool blocking) {
if (TryPopFromQueue()) {
return std::move(record_holder);
}
auto pop_fn =
blocking ? &SliceSnapshot::RecordChannel::Pop : &SliceSnapshot::RecordChannel::TryPop;
while ((channel->*pop_fn)(record_holder)) {
if (!enforce_order) {
return std::move(record_holder);
}
if (record_holder.id == next_record_id) {
++next_record_id;
return std::move(record_holder);
}
// record popped from channel is ooo, push to queue
q_records.emplace(std::move(record_holder));
}
return std::nullopt;
}
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
error_code io_error; error_code io_error;
size_t channel_bytes = 0; size_t channel_bytes = 0;
SliceSnapshot::DbRecord record; std::optional<SliceSnapshot::DbRecord> record;
RecordsPopper records_popper(push_to_sink_with_order_, &channel_);
// we can not exit on io-error since we spawn fibers that push data. // we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error. // TODO: we may signal them to stop processing and exit asap in case of the error.
auto& channel = channel_; while (record = records_popper.Pop()) {
while (channel.Pop(record)) {
if (io_error || cll->IsCancelled()) if (io_error || cll->IsCancelled())
continue; continue;
@ -925,14 +1009,15 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
if (cll->IsCancelled()) if (cll->IsCancelled())
continue; continue;
DVLOG(2) << "Pulled " << record.id; DVLOG(2) << "Pulled " << record->id;
channel_bytes += record.value.size(); channel_bytes += record->value.size();
io_error = sink_->Write(io::Buffer(record.value)); io_error = sink_->Write(io::Buffer(record->value));
if (io_error) {
record.value.clear(); break;
} while (!io_error && channel.TryPop(record)); }
} // while (channel.pop) } while (record = records_popper.TryPop());
} // while (records_popper.Pop())
size_t pushed_bytes = 0; size_t pushed_bytes = 0;
for (auto& ptr : shard_snapshots_) { for (auto& ptr : shard_snapshots_) {
@ -940,7 +1025,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
pushed_bytes += ptr->channel_bytes(); pushed_bytes += ptr->channel_bytes();
} }
DCHECK(!channel.TryPop(record)); DCHECK(!channel_.TryPop(*record));
VLOG(1) << "Channel pulled bytes: " << channel_bytes << " pushed bytes: " << pushed_bytes; VLOG(1) << "Channel pulled bytes: " << channel_bytes << " pushed bytes: " << pushed_bytes;
@ -1026,7 +1111,7 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
break; break;
} }
VLOG(1) << "Rdb save using compression mode:" << uint32_t(compression_mode_); VLOG(1) << "Rdb save using compression mode:" << uint32_t(compression_mode_);
impl_.reset(new Impl(align_writes, producer_count, compression_mode_, sink)); impl_.reset(new Impl(align_writes, producer_count, compression_mode_, save_mode, sink));
save_mode_ = save_mode; save_mode_ = save_mode;
} }