mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: join for cancel incoming migration (#3692)
This commit is contained in:
parent
e71f679386
commit
9b8aa8eab4
3 changed files with 43 additions and 12 deletions
|
@ -818,7 +818,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
|
|||
SlotSet removed = migration_slots.GetRemovedSlots(tl_cluster_config->GetOwnedSlots());
|
||||
|
||||
migration->Stop();
|
||||
// all fibers has migration shared_ptr so we don't need to join it and can erase
|
||||
// all migration fibers has migration shared_ptr so the object can be removed later
|
||||
jobs.erase(it);
|
||||
|
||||
// TODO make it outside in one run with other slots that should be flushed
|
||||
|
|
|
@ -28,17 +28,23 @@ using namespace facade;
|
|||
// It is created per shard on the target node to initiate FLOW step.
|
||||
class ClusterShardMigration {
|
||||
public:
|
||||
ClusterShardMigration(uint32_t shard_id, Service* service, IncomingSlotMigration* in_migration)
|
||||
ClusterShardMigration(uint32_t shard_id, Service* service, IncomingSlotMigration* in_migration,
|
||||
util::fb2::BlockingCounter bc)
|
||||
: source_shard_id_(shard_id),
|
||||
is_finished_(false),
|
||||
socket_(nullptr),
|
||||
executor_(service),
|
||||
in_migration_(in_migration) {
|
||||
in_migration_(in_migration),
|
||||
bc_(bc) {
|
||||
}
|
||||
|
||||
void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc)
|
||||
ABSL_LOCKS_EXCLUDED(mu_) {
|
||||
void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) {
|
||||
{
|
||||
util::fb2::LockGuard lk(mu_);
|
||||
if (is_finished_) {
|
||||
return;
|
||||
}
|
||||
is_finished_ = true;
|
||||
socket_ = source;
|
||||
}
|
||||
|
||||
|
@ -60,14 +66,14 @@ class ClusterShardMigration {
|
|||
while (tx_data->opcode == journal::Op::LSN) {
|
||||
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
|
||||
last_attempt_.store(tx_data->lsn);
|
||||
bc->Dec(); // we can Join the flow now
|
||||
bc_->Dec(); // we can Join the flow now
|
||||
// if we get new data, attempt is failed
|
||||
if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) {
|
||||
VLOG(1) << "Finalized flow " << source_shard_id_;
|
||||
return;
|
||||
}
|
||||
VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_;
|
||||
bc->Add(); // the flow isn't finished so we lock it again
|
||||
bc_->Add(); // the flow isn't finished so we lock it again
|
||||
}
|
||||
if (tx_data->opcode == journal::Op::PING) {
|
||||
// TODO check about ping logic
|
||||
|
@ -77,7 +83,7 @@ class ClusterShardMigration {
|
|||
}
|
||||
|
||||
VLOG(2) << "Flow " << source_shard_id_ << " canceled";
|
||||
bc->Dec(); // we should provide ability to join the flow
|
||||
bc_->Dec(); // we should provide ability to join the flow
|
||||
}
|
||||
|
||||
std::error_code Cancel() {
|
||||
|
@ -90,6 +96,11 @@ class ClusterShardMigration {
|
|||
return std::error_code();
|
||||
});
|
||||
}
|
||||
if (!is_finished_) {
|
||||
is_finished_ = true;
|
||||
bc_->Dec(); // we should provide ability to join the flow if the Start() wasn't called
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -118,9 +129,11 @@ class ClusterShardMigration {
|
|||
|
||||
uint32_t source_shard_id_;
|
||||
util::fb2::Mutex mu_;
|
||||
bool is_finished_ ABSL_GUARDED_BY(mu_);
|
||||
util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_);
|
||||
JournalExecutor executor_;
|
||||
IncomingSlotMigration* in_migration_;
|
||||
util::fb2::BlockingCounter bc_;
|
||||
atomic_long last_attempt_{-1};
|
||||
};
|
||||
|
||||
|
@ -133,7 +146,7 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
|
|||
bc_(shards_num) {
|
||||
shard_flows_.resize(shards_num);
|
||||
for (unsigned i = 0; i < shards_num; ++i) {
|
||||
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this));
|
||||
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this, bc_));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,13 +189,31 @@ void IncomingSlotMigration::Stop() {
|
|||
VLOG(1) << "Error during flow Stop: " << err;
|
||||
}
|
||||
}
|
||||
bc_->Cancel();
|
||||
|
||||
// we need to Join the migration process to prevent data corruption
|
||||
const absl::Time start = absl::Now();
|
||||
const absl::Duration timeout =
|
||||
absl::Milliseconds(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms));
|
||||
|
||||
while (true) {
|
||||
const absl::Time now = absl::Now();
|
||||
const absl::Duration passed = now - start;
|
||||
VLOG_EVERY_N(1, 1000) << "Checking whether to continue with stop " << passed << " vs "
|
||||
<< timeout;
|
||||
|
||||
if (bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) {
|
||||
return;
|
||||
} else if (passed >= timeout) {
|
||||
LOG(ERROR) << "Can't stop migration in time";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
|
||||
state_.store(MigrationState::C_SYNC);
|
||||
|
||||
shard_flows_[shard]->Start(&cntx_, source, bc_);
|
||||
shard_flows_[shard]->Start(&cntx_, source);
|
||||
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ class IncomingSlotMigration {
|
|||
// After Join we still can get data due to error situation
|
||||
[[nodiscard]] bool Join(long attempt);
|
||||
|
||||
// Stop migrations, can be called even after migration is finished
|
||||
// Stop and join the migration, can be called even after migration is finished
|
||||
void Stop();
|
||||
|
||||
MigrationState GetState() const {
|
||||
|
|
Loading…
Reference in a new issue