diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 204ade766..28ec5ca1b 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -311,6 +311,12 @@ bool ClusterConfig::IsMySlot(std::string_view key) const { return IsMySlot(KeySlot(key)); } +void ClusterConfig::RemoveSlots(SlotSet slots) { + for (const auto s : slots) { + my_slots_.set(s, false); + } +} + ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id; @@ -340,4 +346,13 @@ SlotSet ClusterConfig::GetOwnedSlots() const { return set; } +SlotSet ToSlotSet(const std::vector& slots) { + SlotSet sset; + for (const auto& slot_range : slots) { + for (auto i = slot_range.start; i <= slot_range.end; ++i) + sset.insert(i); + } + return sset; +} + } // namespace dfly diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index cfd8fd15d..57109738a 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -85,6 +85,8 @@ class ClusterConfig { bool IsMySlot(SlotId id) const; bool IsMySlot(std::string_view key) const; + void RemoveSlots(SlotSet slots); + // Returns the master configured for `id`. Node GetMasterNodeForSlot(SlotId id) const; @@ -106,4 +108,6 @@ class ClusterConfig { std::bitset my_slots_; }; +SlotSet ToSlotSet(const std::vector& slots); + } // namespace dfly diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 1c84974ff..e76ef34ab 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -9,6 +9,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "base/flags.h" #include "base/logging.h" #include "core/json_object.h" @@ -712,15 +713,39 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont return cntx->SendError("Migration process is not in STABLE_SYNC state"); } - shard_set->pool()->AwaitFiberOnAll([migration](auto*) { - if (const auto* shard = EngineShard::tlocal(); shard) - migration->Finalize(shard->shard_id()); + // TODO implement blocking on migrated slots only + [[maybe_unused]] const auto deleted_slots = ToSlotSet(migration->GetSlotRange()); + + bool is_block_active = true; + auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; + auto pause_fb_opt = + Pause(server_family_->GetListeners(), cntx->conn(), ClientPause::WRITE, is_pause_in_progress); + + if (!pause_fb_opt) { + LOG(WARNING) << "Cluster migration finalization time out"; + return cntx->SendError("Blocking connections time out"); + } + + absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] { + is_block_active = false; + pause_fb_opt->JoinIfNeeded(); }); - // TODO do next after ACK - util::ThisFiber::SleepFor(500ms); + auto cb = [this, &migration](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) { + // TODO add error processing to move back into STABLE_SYNC state + migration->Finalize(shard->shard_id()); + } + }; + + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + + // TODO do next after ACK + util::ThisFiber::SleepFor(200ms); + + shard_set->pool()->AwaitFiberOnAll([&migration, &deleted_slots](auto*) mutable { + tl_cluster_config->RemoveSlots(deleted_slots); - shard_set->pool()->AwaitFiberOnAll([migration](auto*) { if (const auto* shard = EngineShard::tlocal(); shard) migration->Cancel(shard->shard_id()); }); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a88125e86..3b76e71d7 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -47,11 +47,7 @@ OutgoingMigration::~OutgoingMigration() = default; void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest) { - SlotSet sset; - for (const auto& slot_range : slots_) { - for (auto i = slot_range.start; i <= slot_range.end; ++i) - sset.insert(i); - } + SlotSet sset = ToSlotSet(slots_); const auto shard_id = slice->shard_id(); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 770238bad..e57ee393c 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -33,10 +33,15 @@ class OutgoingMigration { const std::string& GetHostIp() const { return host_ip_; }; + uint16_t GetPort() const { return port_; }; + const std::vector& GetSlotRange() const { + return slots_; + } + private: MigrationState GetStateImpl() const; // SliceSlotMigration manages state and data transfering for the corresponding shard diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 207f96980..59ee11027 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -424,7 +424,7 @@ void ClientPauseCmd(CmdArgList args, absl::Span listeners, CmdArgParser parser(args); auto timeout = parser.Next(); - enum ClientPause pause_state = ClientPause::ALL; + ClientPause pause_state = ClientPause::ALL; if (parser.HasNext()) { pause_state = parser.ToUpper().Switch("WRITE", ClientPause::WRITE, "ALL", ClientPause::ALL); } @@ -432,53 +432,20 @@ void ClientPauseCmd(CmdArgList args, absl::Span listeners, return cntx->SendError(err->MakeReply()); } - // Set global pause state and track commands that are running when the pause state is flipped. - // Exlude already paused commands from the busy count. - DispatchTracker tracker{listeners, cntx->conn(), true /* ignore paused commands */}; - shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) { - // Commands don't suspend before checking the pause state, so - // it's impossible to deadlock on waiting for a command that will be paused. - tracker.TrackOnThread(); - ServerState::tlocal()->SetPauseState(pause_state, true); - }); + const auto timeout_ms = timeout * 1ms; + auto is_pause_in_progress = [end_time = chrono::steady_clock::now() + timeout_ms] { + return ServerState::tlocal()->gstate() != GlobalState::SHUTTING_DOWN && + chrono::steady_clock::now() < end_time; + }; - // TODO handle blocking commands - // Wait for all busy commands to finish running before replying to guarantee - // that no more (write) operations will occur. - const absl::Duration kDispatchTimeout = absl::Seconds(1); - if (!tracker.Wait(kDispatchTimeout)) { - LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout; - shard_set->pool()->Await([pause_state](util::ProactorBase* pb) { - ServerState::tlocal()->SetPauseState(pause_state, false); - }); - return cntx->SendError("Failed to pause all running clients"); + if (auto pause_fb_opt = + Pause(listeners, cntx->conn(), pause_state, std::move(is_pause_in_progress)); + pause_fb_opt) { + pause_fb_opt->Detach(); + cntx->SendOk(); + } else { + cntx->SendError("Failed to pause all running clients"); } - - // We should not expire/evict keys while clients are puased. - shard_set->RunBriefInParallel( - [](EngineShard* shard) { shard->db_slice().SetExpireAllowed(false); }); - - fb2::Fiber("client_pause", [timeout, pause_state]() mutable { - // On server shutdown we sleep 10ms to make sure all running task finish, therefore 10ms steps - // ensure this fiber will not left hanging . - auto step = 10ms; - auto timeout_ms = timeout * 1ms; - int64_t steps = timeout_ms.count() / step.count(); - ServerState& etl = *ServerState::tlocal(); - do { - ThisFiber::SleepFor(step); - } while (etl.gstate() != GlobalState::SHUTTING_DOWN && --steps > 0); - - if (etl.gstate() != GlobalState::SHUTTING_DOWN) { - shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) { - ServerState::tlocal()->SetPauseState(pause_state, false); - }); - shard_set->RunBriefInParallel( - [](EngineShard* shard) { shard->db_slice().SetExpireAllowed(true); }); - } - }).Detach(); - - cntx->SendOk(); } void ClientTracking(CmdArgList args, ConnectionContext* cntx) { @@ -586,6 +553,54 @@ string_view GetRedisMode() { } // namespace +std::optional Pause(absl::Span listeners, + facade::Connection* conn, ClientPause pause_state, + std::function is_pause_in_progress) { + // Set global pause state and track commands that are running when the pause state is flipped. + // Exlude already paused commands from the busy count. + DispatchTracker tracker{listeners, conn, true /* ignore paused commands */}; + shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) { + // Commands don't suspend before checking the pause state, so + // it's impossible to deadlock on waiting for a command that will be paused. + tracker.TrackOnThread(); + ServerState::tlocal()->SetPauseState(pause_state, true); + }); + + // TODO handle blocking commands + // Wait for all busy commands to finish running before replying to guarantee + // that no more (write) operations will occur. + const absl::Duration kDispatchTimeout = absl::Seconds(1); + if (!tracker.Wait(kDispatchTimeout)) { + LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout; + shard_set->pool()->Await([pause_state](util::ProactorBase* pb) { + ServerState::tlocal()->SetPauseState(pause_state, false); + }); + return std::nullopt; + } + + // We should not expire/evict keys while clients are puased. + shard_set->RunBriefInParallel( + [](EngineShard* shard) { shard->db_slice().SetExpireAllowed(false); }); + + return fb2::Fiber("client_pause", [is_pause_in_progress, pause_state]() mutable { + // On server shutdown we sleep 10ms to make sure all running task finish, therefore 10ms steps + // ensure this fiber will not left hanging . + constexpr auto step = 10ms; + while (is_pause_in_progress()) { + ThisFiber::SleepFor(step); + } + + ServerState& etl = *ServerState::tlocal(); + if (etl.gstate() != GlobalState::SHUTTING_DOWN) { + shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) { + ServerState::tlocal()->SetPauseState(pause_state, false); + }); + shard_set->RunBriefInParallel( + [](EngineShard* shard) { shard->db_slice().SetExpireAllowed(true); }); + } + }); +} + ServerFamily::ServerFamily(Service* service) : service_(*service) { start_time_ = time(NULL); last_save_info_.save_time = start_time_; diff --git a/src/server/server_family.h b/src/server/server_family.h index bb2acbf41..86707e1a8 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -302,4 +302,9 @@ class ServerFamily { mutable PeakStats peak_stats_; }; +// Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress +std::optional Pause(absl::Span listeners, + facade::Connection* conn, ClientPause pause_state, + std::function is_pause_in_progress); + } // namespace dfly