1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(server): dont use channel for replication / save df (#4041)

* feat server: dont use channel for replication / save df

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-11-05 16:50:01 +02:00 committed by GitHub
parent 7df8c268d8
commit ae3faf59fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 203 additions and 175 deletions

View file

@ -124,7 +124,11 @@ GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
}
error_code RdbSnapshot::SaveBody() {
return saver_->SaveBody(&cntx_, &freq_map_);
return saver_->SaveBody(&cntx_);
}
error_code RdbSnapshot::WaitSnapshotInShard(EngineShard* shard) {
return saver_->WaitSnapshotInShard(shard);
}
size_t RdbSnapshot::GetSaveBuffersSize() {
@ -132,6 +136,10 @@ size_t RdbSnapshot::GetSaveBuffersSize() {
return saver_->GetTotalBuffersSize();
}
void RdbSnapshot::FillFreqMap() {
saver_->FillFreqMap(&freq_map_);
}
RdbSaver::SnapshotStats RdbSnapshot::GetCurrentSnapshotProgress() const {
CHECK(saver_);
return saver_->GetCurrentSnapshotProgress();
@ -147,7 +155,7 @@ error_code RdbSnapshot::Close() {
}
void RdbSnapshot::StartInShard(EngineShard* shard) {
saver_->StartSnapshotInShard(false, cntx_.GetCancellation(), shard);
saver_->StartSnapshotInShard(false, &cntx_, shard);
started_shards_.fetch_add(1, memory_order_relaxed);
}
@ -176,7 +184,12 @@ std::optional<SaveInfo> SaveStagesController::InitResourcesAndStart() {
}
void SaveStagesController::WaitAllSnapshots() {
RunStage(&SaveStagesController::SaveCb);
if (use_dfs_format_) {
shard_set->RunBlockingInParallel([&](EngineShard* shard) { WaitSnapshotInShard(shard); });
SaveBody(shard_set->size());
} else {
SaveBody(0);
}
}
SaveInfo SaveStagesController::Finalize() {
@ -395,13 +408,22 @@ GenericError SaveStagesController::BuildFullPath() {
return {};
}
void SaveStagesController::SaveCb(unsigned index) {
if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted())
void SaveStagesController::SaveBody(unsigned index) {
CHECK(!use_dfs_format_ || index == shard_set->size()); // used in rdb and df summary file
if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted()) {
shared_err_ = snapshot->SaveBody();
}
}
void SaveStagesController::WaitSnapshotInShard(EngineShard* shard) {
if (auto& snapshot = snapshots_[shard->shard_id()].first; snapshot && snapshot->HasStarted()) {
shared_err_ = snapshot->WaitSnapshotInShard(shard);
}
}
void SaveStagesController::CloseCb(unsigned index) {
if (auto& snapshot = snapshots_[index].first; snapshot) {
snapshot->FillFreqMap();
shared_err_ = snapshot->Close();
unique_lock lk{rdb_name_map_mu_};

View file

@ -46,6 +46,8 @@ class RdbSnapshot {
void StartInShard(EngineShard* shard);
error_code SaveBody();
error_code WaitSnapshotInShard(EngineShard* shard);
void FillFreqMap();
error_code Close();
size_t GetSaveBuffersSize();
@ -101,6 +103,8 @@ struct SaveStagesController : public SaveStagesInputs {
// Start saving a dfs file on shard
void SaveDfsSingle(EngineShard* shard);
void SaveSnashot(EngineShard* shard);
void WaitSnapshotInShard(EngineShard* shard);
// Save a single rdb file
void SaveRdb();
@ -115,7 +119,7 @@ struct SaveStagesController : public SaveStagesInputs {
// Build full path: get dir, try creating dirs, get filename with placeholder
GenericError BuildFullPath();
void SaveCb(unsigned index);
void SaveBody(unsigned index);
void CloseCb(unsigned index);

View file

@ -121,7 +121,6 @@ void DflyCmd::ReplicaInfo::Cancel() {
flow->cleanup();
}
VLOG(2) << "After flow cleanup " << shard->shard_id();
flow->full_sync_fb.JoinIfNeeded();
flow->conn = nullptr;
});
// Wait for error handler to quit.
@ -371,7 +370,7 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];
StopFullSyncInThread(flow, shard);
StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));
@ -551,7 +550,6 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
}
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(!flow->full_sync_fb.IsJoinable());
DCHECK(shard);
DCHECK(flow->conn);
@ -569,7 +567,6 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// callbacks are blocked on trying to insert to channel.
flow->TryShutdownSocket();
flow->saver->CancelInShard(shard); // stops writing to journal stream to channel
flow->full_sync_fb.JoinIfNeeded(); // finishes poping data from channel
flow->saver.reset();
};
@ -588,18 +585,24 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
if (flow->start_partial_sync_at.has_value())
saver->StartIncrementalSnapshotInShard(cntx, shard, *flow->start_partial_sync_at);
else
saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
saver->StartSnapshotInShard(true, cntx, shard);
flow->full_sync_fb = fb2::Fiber("full_sync", &DflyCmd::FullSyncFb, this, flow, cntx);
return OpStatus::OK;
}
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(shard);
flow->saver->StopFullSyncInShard(shard);
error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
return;
}
// Wait for full sync to finish.
flow->full_sync_fb.JoinIfNeeded();
ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
}
// Reset cleanup and saver
flow->cleanup = []() {};
@ -626,23 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
return OpStatus::OK;
}
void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
error_code ec;
if (ec = flow->saver->SaveBody(cntx, nullptr); ec) {
if (!flow->conn->socket()->IsOpen())
ec = make_error_code(errc::operation_canceled); // we cancelled the operation.
cntx->ReportError(ec);
return;
}
ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
}
}
auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair<uint32_t, unsigned> {
util::fb2::LockGuard lk(mu_);
unsigned sync_id = next_sync_id_++;

View file

@ -39,7 +39,6 @@ struct FlowInfo {
facade::Connection* conn = nullptr;
util::fb2::Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver for full sync phase.
std::unique_ptr<JournalStreamer> streamer; // Streamer for stable sync phase
std::string eof_token;
@ -210,14 +209,11 @@ class DflyCmd {
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Stop full sync in thread. Run state switch cleanup.
void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard);
void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow, Context* cntx);
// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_);

View file

@ -29,6 +29,7 @@ extern "C" {
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/size_tracking_channel.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
@ -834,6 +835,20 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}
error_code RdbSerializer::SendEofAndChecksum() {
VLOG(2) << "SendEof";
/* EOF opcode */
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_EOF));
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
uint8_t buf[8];
uint64_t chksum = 0;
absl::little_endian::Store64(buf, chksum);
return WriteRaw(buf);
}
error_code RdbSerializer::SendJournalOffset(uint64_t journal_offset) {
VLOG(2) << "SendJournalOffset";
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_OFFSET));
@ -1107,11 +1122,13 @@ class RdbSaver::Impl {
~Impl();
void StartSnapshotting(bool stream_journal, const Cancellation* cll, EngineShard* shard);
void StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard);
void StartIncrementalSnapshotting(Context* cntx, EngineShard* shard, LSN start_lsn);
void StopSnapshotting(EngineShard* shard);
void WaitForSnapshottingFinish(EngineShard* shard);
// used only for legacy rdb save flows.
error_code ConsumeChannel(const Cancellation* cll);
void FillFreqMap(RdbTypeFreqMap* dest) const;
@ -1143,6 +1160,8 @@ class RdbSaver::Impl {
}
private:
void PushSnapshotData(Context* cntx, string record);
void FinalizeSnapshotWriting();
error_code WriteRecord(io::Bytes src);
unique_ptr<SliceSnapshot>& GetSnapshot(EngineShard* shard);
@ -1152,7 +1171,8 @@ class RdbSaver::Impl {
vector<unique_ptr<SliceSnapshot>> shard_snapshots_;
// used for serializing non-body components in the calling fiber.
RdbSerializer meta_serializer_;
SliceSnapshot::RecordChannel channel_;
using RecordChannel = SizeTrackingChannel<string, base::mpmc_bounded_queue<string>>;
std::optional<RecordChannel> channel_;
std::optional<AlignedBuffer> aligned_buf_;
// Single entry compression is compatible with redis rdb snapshot
@ -1170,14 +1190,14 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode
shard_snapshots_(producers_len),
meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression
// at all in meta serializer
channel_{kChannelLen, producers_len},
compression_mode_(compression_mode) {
if (align_writes) {
aligned_buf_.emplace(kBufLen, sink);
sink_ = &aligned_buf_.value();
}
DCHECK(producers_len > 0 || channel_.IsClosing());
if (sm == SaveMode::RDB) {
channel_.emplace(kChannelLen, producers_len);
}
save_mode_ = sm;
}
@ -1213,13 +1233,13 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
error_code io_error;
SliceSnapshot::DbRecord record;
string record;
auto& stats = ServerState::tlocal()->stats;
DCHECK(channel_.has_value());
// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
while (channel_.Pop(record)) {
while (channel_->Pop(record)) {
if (io_error || cll->IsCancelled())
continue;
@ -1227,9 +1247,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
if (cll->IsCancelled())
continue;
DVLOG(2) << "Pulled " << record.id;
auto start = absl::GetCurrentTimeNanos();
io_error = WriteRecord(io::Buffer(record.value));
io_error = WriteRecord(io::Buffer(record));
if (io_error) {
break; // from the inner TryPop loop.
}
@ -1237,15 +1256,15 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
auto delta_usec = (absl::GetCurrentTimeNanos() - start) / 1'000;
stats.rdb_save_usec += delta_usec;
stats.rdb_save_count++;
} while ((channel_.TryPop(record)));
} while ((channel_->TryPop(record)));
} // while (channel_.Pop())
for (auto& ptr : shard_snapshots_) {
ptr->Join();
ptr->WaitSnapshotting();
}
VLOG(1) << "ConsumeChannel finished " << io_error;
DCHECK(!channel_.TryPop(record));
DCHECK(!channel_->TryPop(record));
return io_error;
}
@ -1278,32 +1297,70 @@ error_code RdbSaver::Impl::WriteRecord(io::Bytes src) {
return ec;
}
void RdbSaver::Impl::StartSnapshotting(bool stream_journal, const Cancellation* cll,
EngineShard* shard) {
void RdbSaver::Impl::PushSnapshotData(Context* cntx, string record) {
if (cntx->IsCancelled()) {
return;
}
if (channel_) { // Rdb write to channel
channel_->Push(record);
} else { // Write directly to socket
auto ec = WriteRecord(io::Buffer(record));
if (ec) {
cntx->ReportError(ec);
}
}
}
void RdbSaver::Impl::FinalizeSnapshotWriting() {
if (channel_) {
channel_->StartClosing();
}
}
void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard) {
auto& s = GetSnapshot(shard);
auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id());
s = std::make_unique<SliceSnapshot>(&db_slice, &channel_, compression_mode_);
auto on_snapshot_finish = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this);
auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1);
s = std::make_unique<SliceSnapshot>(&db_slice, compression_mode_, push_cb, on_snapshot_finish);
const auto allow_flush = (save_mode_ != SaveMode::RDB) ? SliceSnapshot::SnapshotFlush::kAllow
: SliceSnapshot::SnapshotFlush::kDisallow;
s->Start(stream_journal, cll, allow_flush);
s->Start(stream_journal, cntx->GetCancellation(), allow_flush);
}
void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* shard,
LSN start_lsn) {
auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id());
auto& s = GetSnapshot(shard);
s = std::make_unique<SliceSnapshot>(&db_slice, &channel_, compression_mode_);
auto on_finalize_cb = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this);
auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1);
s = std::make_unique<SliceSnapshot>(&db_slice, compression_mode_, push_cb, on_finalize_cb);
s->StartIncremental(cntx, start_lsn);
}
// called on save flow
void RdbSaver::Impl::WaitForSnapshottingFinish(EngineShard* shard) {
auto& snapshot = GetSnapshot(shard);
CHECK(snapshot);
snapshot->WaitSnapshotting();
}
// called from replication flow
void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) {
GetSnapshot(shard)->FinalizeJournalStream(false);
auto& snapshot = GetSnapshot(shard);
CHECK(snapshot);
snapshot->FinalizeJournalStream(false);
}
void RdbSaver::Impl::CancelInShard(EngineShard* shard) {
GetSnapshot(shard)->FinalizeJournalStream(true);
auto& snapshot = GetSnapshot(shard);
if (snapshot) { // Cancel can be called before snapshotting started.
snapshot->FinalizeJournalStream(true);
}
}
// This function is called from connection thread when info command is invoked.
@ -1314,7 +1371,8 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const {
auto cb = [this, &channel_bytes, &serializer_bytes](ShardId sid) {
auto& snapshot = shard_snapshots_[sid];
channel_bytes.fetch_add(snapshot->GetTotalChannelCapacity(), memory_order_relaxed);
if (channel_.has_value())
channel_bytes.fetch_add(channel_->GetSize(), memory_order_relaxed);
serializer_bytes.store(snapshot->GetBufferCapacity() + snapshot->GetTempBuffersSize(),
memory_order_relaxed);
};
@ -1437,17 +1495,22 @@ RdbSaver::~RdbSaver() {
tlocal->DecommitMemory(ServerState::kAllMemory);
}
void RdbSaver::StartSnapshotInShard(bool stream_journal, const Cancellation* cll,
EngineShard* shard) {
impl_->StartSnapshotting(stream_journal, cll, shard);
void RdbSaver::StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard) {
impl_->StartSnapshotting(stream_journal, cntx, shard);
}
void RdbSaver::StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn) {
impl_->StartIncrementalSnapshotting(cntx, shard, start_lsn);
}
void RdbSaver::StopFullSyncInShard(EngineShard* shard) {
error_code RdbSaver::WaitSnapshotInShard(EngineShard* shard) {
impl_->WaitForSnapshottingFinish(shard);
return SaveEpilog();
}
error_code RdbSaver::StopFullSyncInShard(EngineShard* shard) {
impl_->StopSnapshotting(shard);
return SaveEpilog();
}
error_code RdbSaver::SaveHeader(const GlobalData& glob_state) {
@ -1459,16 +1522,14 @@ error_code RdbSaver::SaveHeader(const GlobalData& glob_state) {
RETURN_ON_ERR(impl_->serializer()->WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux(std::move(glob_state)));
RETURN_ON_ERR(impl_->FlushSerializer());
return error_code{};
}
error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) {
error_code RdbSaver::SaveBody(Context* cntx) {
RETURN_ON_ERR(impl_->FlushSerializer());
if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
} else {
if (save_mode_ == SaveMode::RDB) {
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel(cntx->GetCancellation());
if (io_error) {
@ -1477,16 +1538,16 @@ error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) {
if (cntx->GetError()) {
return cntx->GetError();
}
} else {
DCHECK(save_mode_ == SaveMode::SUMMARY);
}
RETURN_ON_ERR(SaveEpilog());
return SaveEpilog();
}
if (freq_map) {
freq_map->clear();
impl_->FillFreqMap(freq_map);
}
return error_code{};
void RdbSaver::FillFreqMap(RdbTypeFreqMap* freq_map) {
freq_map->clear();
impl_->FillFreqMap(freq_map);
}
error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
@ -1523,20 +1584,7 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
}
error_code RdbSaver::SaveEpilog() {
uint8_t buf[8];
uint64_t chksum;
auto& ser = *impl_->serializer();
/* EOF opcode */
RETURN_ON_ERR(ser.WriteOpcode(RDB_OPCODE_EOF));
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
chksum = 0;
absl::little_endian::Store64(buf, chksum);
RETURN_ON_ERR(ser.WriteRaw(buf));
RETURN_ON_ERR(impl_->serializer()->SendEofAndChecksum());
RETURN_ON_ERR(impl_->FlushSerializer());

View file

@ -89,21 +89,26 @@ class RdbSaver {
// Initiates the serialization in the shard's thread.
// cll allows breaking in the middle.
void StartSnapshotInShard(bool stream_journal, const Cancellation* cll, EngineShard* shard);
void StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard);
// Send only the incremental snapshot since start_lsn.
void StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn);
// Stops full-sync serialization for replication in the shard's thread.
void StopFullSyncInShard(EngineShard* shard);
std::error_code StopFullSyncInShard(EngineShard* shard);
// Wait for snapshotting finish in shard thread. Called from save flows in shard thread.
std::error_code WaitSnapshotInShard(EngineShard* shard);
// Stores auxiliary (meta) values and header_info
std::error_code SaveHeader(const GlobalData& header_info);
// Writes the RDB file into sink. Waits for the serialization to finish.
// Called only for save rdb flow and save df on summary file.
std::error_code SaveBody(Context* cntx);
// Fills freq_map with the histogram of rdb types.
// freq_map can optionally be null.
std::error_code SaveBody(Context* cntx, RdbTypeFreqMap* freq_map);
void FillFreqMap(RdbTypeFreqMap* freq_map);
void CancelInShard(EngineShard* shard);
@ -232,6 +237,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SendJournalOffset(uint64_t journal_offset);
size_t GetTempBufferSize() const override;
std::error_code SendEofAndChecksum();
private:
// Might preempt if flush_fun_ is used

View file

@ -32,16 +32,17 @@ using facade::operator""_KB;
namespace {
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
constexpr size_t kMinChannelBlobSize = 32_KB;
constexpr size_t kMinBlobSize = 32_KB;
} // namespace
size_t SliceSnapshot::DbRecord::size() const {
return HeapSize(value);
}
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
std::function<void(std::string)> on_push_record,
std::function<void()> on_snapshot_finish)
: db_slice_(slice),
compression_mode_(compression_mode),
on_push_(on_push_record),
on_snapshot_finish_(on_snapshot_finish) {
db_array_ = slice->databases();
tl_slice_snapshots.insert(this);
}
@ -53,7 +54,7 @@ SliceSnapshot::~SliceSnapshot() {
size_t SliceSnapshot::GetThreadLocalMemoryUsage() {
size_t mem = 0;
for (SliceSnapshot* snapshot : tl_slice_snapshots) {
mem += snapshot->GetBufferCapacity() + snapshot->GetTotalChannelCapacity();
mem += snapshot->GetBufferCapacity();
}
return mem;
}
@ -81,8 +82,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
flush_fun = [this, flush_threshold](size_t bytes_serialized,
RdbSerializer::FlushState flush_state) {
if (bytes_serialized > flush_threshold) {
size_t serialized = FlushChannelRecord(flush_state);
VLOG(2) << "FlushedToChannel " << serialized << " bytes";
size_t serialized = FlushSerialized(flush_state);
VLOG(2) << "FlushSerialized " << serialized << " bytes";
}
};
}
@ -93,11 +94,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] {
IterateBucketsFb(cll, stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
// We stop the channel if we are performing backups (non-streaming).
// We keep the channel for replication in order to send jounal changes until we finalize.
if (!stream_journal) {
CloseRecordChannel();
}
on_snapshot_finish_();
});
}
@ -127,10 +124,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
journal->UnregisterOnChange(cb_id);
if (!cancel) {
serializer_->SendJournalOffset(journal->GetLsn());
PushSerializedToChannel(true);
PushSerialized(true);
}
CloseRecordChannel();
}
// The algorithm is to go over all the buckets and serialize those with
@ -175,7 +170,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
PrimeTable::Cursor next =
db_slice_->Traverse(pt, cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
cursor = next;
PushSerializedToChannel(false);
PushSerialized(false);
if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << ThisFiber::GetName();
@ -185,18 +180,18 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
last_yield = stats_.loop_serialized;
// Push in case other fibers (writes commands that pushed previous values)
// filled the buffer.
PushSerializedToChannel(false);
PushSerialized(false);
}
} while (cursor);
DVLOG(2) << "after loop " << ThisFiber::GetName();
PushSerializedToChannel(true);
PushSerialized(true);
} // for (dbindex)
CHECK(!serialize_bucket_running_);
if (send_full_sync_cut) {
CHECK(!serializer_->SendFullSyncCut());
PushSerializedToChannel(true);
PushSerialized(true);
}
// serialized + side_saved must be equal to the total saved.
@ -214,7 +209,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
// The replica sends the LSN of the next entry is wants to receive.
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerializedToChannel(false);
PushSerialized(false);
lsn++;
}
@ -236,7 +231,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
}
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
PushSerializedToChannel(true);
PushSerialized(true);
} else {
// We stopped but we didn't manage to send the whole stream.
cntx->ReportError(
@ -321,7 +316,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
}
}
size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) {
size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
io::StringFile sfile;
serializer_->FlushToSink(&sfile, flush_state);
@ -331,34 +326,34 @@ size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state)
uint64_t id = rec_id_++;
DVLOG(2) << "Pushing " << id;
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
fb2::NoOpLock lk;
// We create a critical section here that ensures that records are pushed in sequential order.
// As a result, it is not possible for two fiber producers to push into channel concurrently.
// As a result, it is not possible for two fiber producers to push concurrently.
// If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4.
// Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and
// update last_pushed_id_ to 5.
seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; });
// Blocking point.
size_t channel_usage = dest_->Push(std::move(db_rec));
on_push_(std::move(sfile.val));
DCHECK_EQ(last_pushed_id_ + 1, id);
last_pushed_id_ = id;
seq_cond_.notify_all();
VLOG(2) << "Pushed with Serialize() " << serialized
<< " bytes, channel total usage: " << channel_usage;
VLOG(2) << "Pushed with Serialize() " << serialized;
return serialized;
}
bool SliceSnapshot::PushSerializedToChannel(bool force) {
if (!force && serializer_->SerializedLen() < kMinChannelBlobSize)
bool SliceSnapshot::PushSerialized(bool force) {
if (!force && serializer_->SerializedLen() < kMinBlobSize)
return false;
// Flush any of the leftovers to avoid interleavings
size_t serialized = FlushChannelRecord(FlushState::kFlushMidEntry);
size_t serialized = FlushSerialized(FlushState::kFlushMidEntry);
if (!delayed_entries_.empty()) {
// Async bucket serialization might have accumulated some delayed values.
@ -371,7 +366,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
} while (!delayed_entries_.empty());
// blocking point.
serialized += FlushChannelRecord(FlushState::kFlushMidEntry);
serialized += FlushSerialized(FlushState::kFlushMidEntry);
}
return serialized > 0;
}
@ -400,7 +395,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerializedToChannel.
// additional journal change to serialize, it simply invokes PushSerialized.
std::unique_lock lk(db_slice_->GetSerializationMutex());
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
@ -409,19 +404,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
if (await) {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerializedToChannel(false);
}
}
void SliceSnapshot::CloseRecordChannel() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
std::unique_lock lk(db_slice_->GetSerializationMutex());
CHECK(!serialize_bucket_running_);
// Make sure we close the channel only once with a CAS check.
bool expected = false;
if (closed_chan_.compare_exchange_strong(expected, true)) {
dest_->StartClosing();
PushSerialized(false);
}
}
@ -433,10 +416,6 @@ size_t SliceSnapshot::GetBufferCapacity() const {
return serializer_->GetBufferCapacity();
}
size_t SliceSnapshot::GetTotalChannelCapacity() const {
return dest_->GetSize();
}
size_t SliceSnapshot::GetTempBuffersSize() const {
if (serializer_ == nullptr) {
return 0;

View file

@ -8,7 +8,6 @@
#include <bitset>
#include "base/pod_array.h"
#include "core/size_tracking_channel.h"
#include "io/file.h"
#include "server/common.h"
#include "server/db_slice.h"
@ -31,35 +30,27 @@ struct Entry;
// │ SerializeBucket │ Both might fall back to a temporary serializer
// └────────────┬─────────────┘ if default is used on another db index
// │
// | Channel is left open in journal streaming mode
// | Socket is left open in journal streaming mode
// ▼
// ┌──────────────────────────┐ ┌─────────────────────────┐
// │ SerializeEntry │ ◄────────┤ OnJournalEntry │
// └─────────────┬────────────┘ └─────────────────────────┘
// │
// PushBytesToChannel Default buffer gets flushed on iteration,
// │ temporary on destruction
// PushBytes Default buffer gets flushed on iteration,
// │ temporary on destruction
// ▼
// ┌──────────────────────────────┐
// │ dest->Push(buffer) │
// │ push_cb(buffer) │
// └──────────────────────────────┘
// SliceSnapshot is used for iterating over a shard at a specified point-in-time
// and submitting all values to an output channel.
// and submitting all values to an output sink.
// In journal streaming mode, the snapshot continues submitting changes
// over the channel until explicitly stopped.
// over the sink until explicitly stopped.
class SliceSnapshot {
public:
struct DbRecord {
uint64_t id;
std::string value;
size_t size() const;
};
using RecordChannel = SizeTrackingChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;
SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode);
SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
std::function<void(std::string)> on_push, std::function<void()> on_snapshot_finish);
~SliceSnapshot();
static size_t GetThreadLocalMemoryUsage();
@ -85,7 +76,7 @@ class SliceSnapshot {
// Waits for a regular, non journal snapshot to finish.
// Called only for non-replication, backups usecases.
void Join() {
void WaitSnapshotting() {
snapshot_fb_.JoinIfNeeded();
}
@ -114,18 +105,15 @@ class SliceSnapshot {
// Journal listener
void OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg);
// Close dest channel if not closed yet.
void CloseRecordChannel();
// Push serializer's internal buffer to channel.
// Push serializer's internal buffer.
// Push regardless of buffer size if force is true.
// Return true if pushed. Can block. Is called from the snapshot thread.
bool PushSerializedToChannel(bool force);
bool PushSerialized(bool force);
// Helper function that flushes the serialized items into the RecordStream.
// Can block on the channel.
// Can block.
using FlushState = SerializerBase::FlushState;
size_t FlushChannelRecord(FlushState flush_state);
size_t FlushSerialized(FlushState flush_state);
public:
uint64_t snapshot_version() const {
@ -138,7 +126,6 @@ class SliceSnapshot {
// Get different sizes, in bytes. All disjoint.
size_t GetBufferCapacity() const;
size_t GetTotalChannelCapacity() const;
size_t GetTempBuffersSize() const;
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;
@ -156,9 +143,6 @@ class SliceSnapshot {
DbSlice* db_slice_;
DbTableArray db_array_;
RecordChannel* dest_;
std::atomic_bool closed_chan_{false}; // true if dest_->StartClosing was already called
DbIndex current_db_;
std::unique_ptr<RdbSerializer> serializer_;
@ -184,6 +168,9 @@ class SliceSnapshot {
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;
std::function<void(std::string)> on_push_;
std::function<void()> on_snapshot_finish_;
};
} // namespace dfly

View file

@ -439,7 +439,7 @@ class TestDflySnapshotOnShutdown:
@pytest.mark.parametrize("format", FILE_FORMATS)
@dfly_args({**BASIC_ARGS, "dbfilename": "info-while-snapshot"})
async def test_infomemory_while_snapshoting(df_factory, format: str):
async def test_infomemory_while_snapshotting(df_factory, format: str):
instance = df_factory.create(dbfilename=f"dump_{tmp_file_name()}")
instance.start()
async_client = instance.client()