1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: reduce delay when stop replica #3020 (#3028)

* fix: reduce delay when stop replica #3020
This commit is contained in:
Borys 2024-05-12 17:30:06 +03:00 committed by GitHub
parent a9956e9723
commit 4cd142d42c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 14 additions and 7 deletions

View file

@ -201,6 +201,10 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}
void stop() {
stop_early_.store(true);
}
// Return the offset that was received with a RDB_OPCODE_JOURNAL_OFFSET command,
// or 0 if no offset was received.
std::optional<uint64_t> journal_offset() const {

View file

@ -141,8 +141,8 @@ void Replica::Stop() {
// Stops the loop in MainReplicationFb.
proactor_->Await([this] {
cntx_.Cancel(); // Context is fully resposible for cleanup.
state_mask_.store(0); // Specifically ~R_ENABLED.
cntx_.Cancel(); // Context is fully resposible for cleanup.
});
// Make sure the replica fully stopped and did all cleanup,
@ -758,8 +758,7 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
DCHECK(leftover_buf_);
io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()};
RdbLoader loader(&service_);
loader.SetFullSyncCutCb([bc, ran = false]() mutable {
rdb_loader_->SetFullSyncCutCb([bc, ran = false]() mutable {
if (!ran) {
bc->Dec();
ran = true;
@ -767,13 +766,13 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
});
// Load incoming rdb stream.
if (std::error_code ec = loader.Load(&ps); ec) {
if (std::error_code ec = rdb_loader_->Load(&ps); ec) {
cntx->ReportError(ec, "Error loading rdb format");
return;
}
// Try finding eof token.
io::PrefixSource chained_tail{loader.Leftover(), &ps};
io::PrefixSource chained_tail{rdb_loader_->Leftover(), &ps};
if (!eof_token.empty()) {
unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]};
@ -796,14 +795,14 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
leftover_buf_.reset();
}
if (auto jo = loader.journal_offset(); jo.has_value()) {
if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) {
this->journal_rec_executed_.store(*jo);
} else {
if (master_context_.version > DflyVersion::VER0)
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Error finding journal offset in stream");
}
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes";
}
void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
@ -926,6 +925,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
flow_id_(flow_id) {
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
}
DflyShardReplica::~DflyShardReplica() {
@ -1168,6 +1168,7 @@ void DflyShardReplica::JoinFlow() {
}
void DflyShardReplica::Cancel() {
rdb_loader_->stop();
CloseSocket();
shard_replica_waker_.notifyAll();
}

View file

@ -175,6 +175,7 @@ class Replica : ProtocolClient {
std::optional<cluster::SlotRange> slot_range_;
};
class RdbLoader;
// This class implements a single shard replication flow from a Dragonfly master instance.
// Multiple DflyShardReplica objects are managed by a Replica object.
class DflyShardReplica : public ProtocolClient {
@ -224,6 +225,7 @@ class DflyShardReplica : public ProtocolClient {
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
std::unique_ptr<RdbLoader> rdb_loader_;
// The master instance has a LSN for each journal record. This counts
// the number of journal records executed in this flow plus the initial