mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
chore: simplify master replication cancelation interface (#3439)
* chore: simplify master replication cancelation interface Before that CancelReplication did too many things, moreover, we had StopReplication that did the same. This PR moves CancelReplication under ReplicaInfo struct, and reduces code duplication around this change. Signed-off-by: Roman Gershman <roman@dragonflydb.io> * Update src/server/dflycmd.cc Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com> Signed-off-by: Roman Gershman <romange@gmail.com> --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io> Signed-off-by: Roman Gershman <romange@gmail.com> Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
parent
55d39b66ff
commit
9eacedf58e
3 changed files with 44 additions and 60 deletions
|
@ -99,6 +99,32 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
|
|||
|
||||
} // namespace
|
||||
|
||||
void DflyCmd::ReplicaInfo::Cancel() {
|
||||
lock_guard lk = GetExclusiveLock();
|
||||
if (replica_state == SyncState::CANCELLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Disconnecting from replica " << address << ":" << listening_port;
|
||||
|
||||
// Update state and cancel context.
|
||||
replica_state = SyncState::CANCELLED;
|
||||
cntx.Cancel();
|
||||
|
||||
// Wait for tasks to finish.
|
||||
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
|
||||
FlowInfo* flow = &flows[shard->shard_id()];
|
||||
if (flow->cleanup) {
|
||||
flow->cleanup();
|
||||
}
|
||||
|
||||
flow->full_sync_fb.JoinIfNeeded();
|
||||
});
|
||||
|
||||
// Wait for error handler to quit.
|
||||
cntx.JoinErrorHandler();
|
||||
}
|
||||
|
||||
DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
|
||||
}
|
||||
|
||||
|
@ -605,18 +631,10 @@ auto DflyCmd::GetReplicaInfoFromConnection(ConnectionContext* cntx)
|
|||
}
|
||||
|
||||
void DflyCmd::OnClose(ConnectionContext* cntx) {
|
||||
unsigned session_id = cntx->conn_state.replication_info.repl_session_id;
|
||||
if (!session_id)
|
||||
unsigned sync_id = cntx->conn_state.replication_info.repl_session_id;
|
||||
if (!sync_id)
|
||||
return;
|
||||
|
||||
auto replica_ptr = GetReplicaInfo(session_id);
|
||||
if (!replica_ptr)
|
||||
return;
|
||||
|
||||
// Because CancelReplication holds the per-replica mutex,
|
||||
// aborting connection will block here until cancellation finishes.
|
||||
// This allows keeping resources alive during the cleanup phase.
|
||||
CancelReplication(session_id, replica_ptr);
|
||||
StopReplication(sync_id);
|
||||
}
|
||||
|
||||
void DflyCmd::StopReplication(uint32_t sync_id) {
|
||||
|
@ -624,42 +642,13 @@ void DflyCmd::StopReplication(uint32_t sync_id) {
|
|||
if (!replica_ptr)
|
||||
return;
|
||||
|
||||
CancelReplication(sync_id, replica_ptr);
|
||||
}
|
||||
// Because CancelReplication holds the per-replica mutex,
|
||||
// aborting connection will block here until cancellation finishes.
|
||||
// This allows keeping resources alive during the cleanup phase.
|
||||
replica_ptr->Cancel();
|
||||
|
||||
void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replica_ptr) {
|
||||
{
|
||||
lock_guard lk = replica_ptr->GetExclusiveLock();
|
||||
if (replica_ptr->replica_state == SyncState::CANCELLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Disconnecting from replica " << replica_ptr->address << ":"
|
||||
<< replica_ptr->listening_port;
|
||||
|
||||
// Update replica_ptr state and cancel context.
|
||||
replica_ptr->replica_state = SyncState::CANCELLED;
|
||||
replica_ptr->cntx.Cancel();
|
||||
|
||||
// Wait for tasks to finish.
|
||||
shard_set->RunBlockingInParallel([replica_ptr](EngineShard* shard) {
|
||||
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];
|
||||
if (flow->cleanup) {
|
||||
flow->cleanup();
|
||||
}
|
||||
|
||||
flow->full_sync_fb.JoinIfNeeded();
|
||||
});
|
||||
}
|
||||
|
||||
// Remove ReplicaInfo from global map
|
||||
{
|
||||
lock_guard lk(mu_);
|
||||
replica_infos_.erase(sync_id);
|
||||
}
|
||||
|
||||
// Wait for error handler to quit.
|
||||
replica_ptr->cntx.JoinErrorHandler();
|
||||
lock_guard lk(mu_);
|
||||
replica_infos_.erase(sync_id);
|
||||
}
|
||||
|
||||
shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
|
||||
|
@ -810,8 +799,8 @@ void DflyCmd::Shutdown() {
|
|||
pending = std::move(replica_infos_);
|
||||
}
|
||||
|
||||
for (auto [sync_id, replica_ptr] : pending) {
|
||||
CancelReplication(sync_id, replica_ptr);
|
||||
for (auto& [_, replica_ptr] : pending) {
|
||||
replica_ptr->Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -119,6 +119,9 @@ class DflyCmd {
|
|||
return std::shared_lock{shared_mu};
|
||||
}
|
||||
|
||||
// Transition into cancelled state, run cleanup.
|
||||
void Cancel();
|
||||
|
||||
SyncState replica_state; // always guarded by shared_mu
|
||||
Context cntx;
|
||||
|
||||
|
@ -157,9 +160,6 @@ class DflyCmd {
|
|||
// Sets metadata.
|
||||
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);
|
||||
|
||||
// Transition into cancelled state, run cleanup.
|
||||
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);
|
||||
|
||||
private:
|
||||
// JOURNAL [START/STOP]
|
||||
// Start or stop journaling.
|
||||
|
@ -208,9 +208,6 @@ class DflyCmd {
|
|||
// Fiber that runs full sync for each flow.
|
||||
void FullSyncFb(FlowInfo* flow, Context* cntx);
|
||||
|
||||
// Main entrypoint for stopping replication.
|
||||
void StopReplication(uint32_t sync_id);
|
||||
|
||||
// Get ReplicaInfo by sync_id.
|
||||
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id);
|
||||
|
||||
|
@ -223,6 +220,9 @@ class DflyCmd {
|
|||
facade::RedisReplyBuilder* rb);
|
||||
|
||||
private:
|
||||
// Main entrypoint for stopping replication.
|
||||
void StopReplication(uint32_t sync_id);
|
||||
|
||||
// Return a map between replication ID to lag. lag is defined as the maximum of difference
|
||||
// between the master's LSN and the last acknowledged LSN in over all shards.
|
||||
std::map<uint32_t, LSN> ReplicationLagsLocked() const;
|
||||
|
|
|
@ -1186,12 +1186,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
// Bonus points because this allows to continue replication with ACL users who got
|
||||
// their access revoked and reinstated
|
||||
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
|
||||
auto info_ptr = server_family_.GetReplicaInfoFromConnection(dfly_cntx);
|
||||
if (info_ptr) {
|
||||
unsigned session_id = dfly_cntx->conn_state.replication_info.repl_session_id;
|
||||
DCHECK(session_id);
|
||||
server_family_.GetDflyCmd()->CancelReplication(session_id, std::move(info_ptr));
|
||||
}
|
||||
server_family_.GetDflyCmd()->OnClose(dfly_cntx);
|
||||
return;
|
||||
}
|
||||
dfly_cntx->SendError(std::move(*err));
|
||||
|
|
Loading…
Reference in a new issue