mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(cluster): Join on specified attempt id (#3305)
**The Bug** Before this fix, source nodes would send `FIN` entries to target nodes (in all thread flows), and would then send a `DFLYMIGRATE ACK` command to verify that all flows received the `FIN` in time. If they didn't, the source node would retry this logic in a loop, until successful. The problem is that, in some rear cases, one or more of the flows would indeed be in a `FIN` state, _but of a previous `FIN` that is already outdated_. If that's indeed the case, all data between that `FIN` and the next `FIN`(s) will be lost. **The Fix** We already have an attempt id that we send in the `DFLYMIGRATE ACK` command, and return it in the response. This fix utilizes the same attempt id to be sent to all flows, and then when joined, we make sure we join on the correct (latest) attempt id. Unfortunately, we can't use `FIN` opcode now, because the protocol does not send any additional metadata for this opcode. I chose to use LSN because it has exactly the fields that we need, and one could possibly think of Log Sequence Number as an attempt id, but I could change that if it's unclear or too hacky. **Testing** To reproduce this, one needs to lower `--slot_migration_connection_timeout_ms` significantly, say to 500ms. This would fail, on my laptop, every ~2 runs. With this fix, it runs hundreds of times and never reproduces.
This commit is contained in:
parent
8355569d46
commit
4e3bd94358
6 changed files with 47 additions and 24 deletions
|
@ -938,7 +938,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!migration)
|
||||
return cntx->SendError(kIdNotFound);
|
||||
|
||||
if (!migration->Join()) {
|
||||
if (!migration->Join(attempt)) {
|
||||
return cntx->SendError("Join timeout happened");
|
||||
}
|
||||
|
||||
|
|
|
@ -53,14 +53,16 @@ class ClusterShardMigration {
|
|||
break;
|
||||
}
|
||||
|
||||
while (tx_data->opcode == journal::Op::FIN) {
|
||||
VLOG(2) << "Attempt to finalize flow " << source_shard_id_;
|
||||
while (tx_data->opcode == journal::Op::LSN) {
|
||||
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
|
||||
last_attempt_.store(tx_data->lsn);
|
||||
bc->Dec(); // we can Join the flow now
|
||||
// if we get new data, attempt is failed
|
||||
if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) {
|
||||
VLOG(1) << "Finalized flow " << source_shard_id_;
|
||||
return;
|
||||
}
|
||||
VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_;
|
||||
bc->Add(); // the flow isn't finished so we lock it again
|
||||
}
|
||||
if (tx_data->opcode == journal::Op::PING) {
|
||||
|
@ -70,6 +72,7 @@ class ClusterShardMigration {
|
|||
}
|
||||
}
|
||||
|
||||
VLOG(2) << "Flow " << source_shard_id_ << " canceled";
|
||||
bc->Dec(); // we should provide ability to join the flow
|
||||
}
|
||||
|
||||
|
@ -86,6 +89,10 @@ class ClusterShardMigration {
|
|||
return {};
|
||||
}
|
||||
|
||||
long GetLastAttempt() const {
|
||||
return last_attempt_.load();
|
||||
}
|
||||
|
||||
private:
|
||||
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
|
||||
if (cntx->IsCancelled()) {
|
||||
|
@ -93,7 +100,8 @@ class ClusterShardMigration {
|
|||
}
|
||||
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
|
||||
if (!tx_data.IsGlobalCmd()) {
|
||||
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
|
||||
VLOG(3) << "Execute cmd without sync between shards. cmd: "
|
||||
<< CmdArgList(tx_data.command.cmd_args);
|
||||
executor_.Execute(tx_data.dbid, tx_data.command);
|
||||
} else {
|
||||
// TODO check which global commands should be supported
|
||||
|
@ -112,6 +120,7 @@ class ClusterShardMigration {
|
|||
util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_);
|
||||
JournalExecutor executor_;
|
||||
IncomingSlotMigration* in_migration_;
|
||||
atomic_long last_attempt_{-1};
|
||||
};
|
||||
|
||||
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
|
||||
|
@ -130,16 +139,29 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
|
|||
IncomingSlotMigration::~IncomingSlotMigration() {
|
||||
}
|
||||
|
||||
bool IncomingSlotMigration::Join() {
|
||||
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
|
||||
if (bc_->WaitFor(timeout)) {
|
||||
state_.store(MigrationState::C_FINISHED);
|
||||
keys_number_ = cluster::GetKeyCount(slots_);
|
||||
return true;
|
||||
bool IncomingSlotMigration::Join(long attempt) {
|
||||
const absl::Time start = absl::Now();
|
||||
const absl::Duration timeout =
|
||||
absl::Milliseconds(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms));
|
||||
|
||||
while (true) {
|
||||
const absl::Time now = absl::Now();
|
||||
const absl::Duration passed = now - start;
|
||||
VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
|
||||
if (passed >= timeout) {
|
||||
LOG(WARNING) << "Can't join migration in time";
|
||||
ReportError(GenericError("Can't join migration in time"));
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) &&
|
||||
(std::all_of(shard_flows_.begin(), shard_flows_.end(),
|
||||
[&](const auto& flow) { return flow->GetLastAttempt() == attempt; }))) {
|
||||
state_.store(MigrationState::C_FINISHED);
|
||||
keys_number_ = cluster::GetKeyCount(slots_);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
LOG(WARNING) << "Can't join migration in time";
|
||||
ReportError(GenericError("Can't join migration in time"));
|
||||
return false;
|
||||
}
|
||||
|
||||
void IncomingSlotMigration::Stop() {
|
||||
|
@ -159,7 +181,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
|
|||
state_.store(MigrationState::C_SYNC);
|
||||
|
||||
shard_flows_[shard]->Start(&cntx_, source, bc_);
|
||||
VLOG(1) << "Incoming flow: " << shard << " finished for " << source_id_;
|
||||
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
|
||||
}
|
||||
|
||||
size_t IncomingSlotMigration::GetKeyCount() const {
|
||||
|
|
|
@ -30,7 +30,7 @@ class IncomingSlotMigration {
|
|||
// Waits until all flows got FIN opcode.
|
||||
// returns true if we joined false if timeout is readed
|
||||
// After Join we still can get data due to error situation
|
||||
[[nodiscard]] bool Join();
|
||||
[[nodiscard]] bool Join(long attempt);
|
||||
|
||||
// Stop migrations, can be called even after migration is finished
|
||||
void Stop();
|
||||
|
|
|
@ -65,8 +65,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
|
|||
streamer_.Cancel();
|
||||
}
|
||||
|
||||
void Finalize() {
|
||||
streamer_.SendFinalize();
|
||||
void Finalize(long attempt) {
|
||||
streamer_.SendFinalize(attempt);
|
||||
}
|
||||
|
||||
const dfly::GenericError GetError() const {
|
||||
|
@ -270,9 +270,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
|
|||
pause_fb_opt->JoinIfNeeded();
|
||||
});
|
||||
|
||||
auto cb = [this](util::ProactorBase* pb) {
|
||||
auto cb = [this, attempt](util::ProactorBase* pb) {
|
||||
if (const auto* shard = EngineShard::tlocal(); shard) {
|
||||
slot_migrations_[shard->shard_id()]->Finalize();
|
||||
slot_migrations_[shard->shard_id()]->Finalize(attempt);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -302,7 +302,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
|
|||
}
|
||||
|
||||
const auto attempt_res = get<int64_t>(LastResponseArgs().front().u);
|
||||
if (attempt_res == kInvalidAttempt) {
|
||||
if (attempt_res != attempt) {
|
||||
LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << attempt_res;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -231,9 +231,9 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
|
|||
} while (cursor);
|
||||
}
|
||||
|
||||
void RestoreStreamer::SendFinalize() {
|
||||
VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id();
|
||||
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
|
||||
void RestoreStreamer::SendFinalize(long attempt) {
|
||||
VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt;
|
||||
journal::Entry entry(journal::Op::LSN, attempt);
|
||||
|
||||
io::StringSink sink;
|
||||
JournalWriter writer{&sink};
|
||||
|
|
|
@ -80,7 +80,7 @@ class RestoreStreamer : public JournalStreamer {
|
|||
// Cancel() must be called if Start() is called
|
||||
void Cancel() override;
|
||||
|
||||
void SendFinalize();
|
||||
void SendFinalize(long attempt);
|
||||
|
||||
bool IsSnapshotFinished() const {
|
||||
return snapshot_finished_;
|
||||
|
|
Loading…
Reference in a new issue