1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster): add slots blocking when we finalize migration (#2484)

* feat(cluster): add slots blocking when we finalize the migration
This commit is contained in:
Borys 2024-02-07 15:19:35 +02:00 committed by GitHub
parent 9537cbdb0b
commit 72f651d527
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 122 additions and 57 deletions

View file

@ -311,6 +311,12 @@ bool ClusterConfig::IsMySlot(std::string_view key) const {
return IsMySlot(KeySlot(key));
}
void ClusterConfig::RemoveSlots(SlotSet slots) {
for (const auto s : slots) {
my_slots_.set(s, false);
}
}
ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id;
@ -340,4 +346,13 @@ SlotSet ClusterConfig::GetOwnedSlots() const {
return set;
}
SlotSet ToSlotSet(const std::vector<ClusterConfig::SlotRange>& slots) {
SlotSet sset;
for (const auto& slot_range : slots) {
for (auto i = slot_range.start; i <= slot_range.end; ++i)
sset.insert(i);
}
return sset;
}
} // namespace dfly

View file

@ -85,6 +85,8 @@ class ClusterConfig {
bool IsMySlot(SlotId id) const;
bool IsMySlot(std::string_view key) const;
void RemoveSlots(SlotSet slots);
// Returns the master configured for `id`.
Node GetMasterNodeForSlot(SlotId id) const;
@ -106,4 +108,6 @@ class ClusterConfig {
std::bitset<kMaxSlotNum + 1> my_slots_;
};
SlotSet ToSlotSet(const std::vector<ClusterConfig::SlotRange>& slots);
} // namespace dfly

View file

@ -9,6 +9,7 @@
#include <mutex>
#include <string>
#include "absl/cleanup/cleanup.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
@ -712,15 +713,39 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Finalize(shard->shard_id());
// TODO implement blocking on migrated slots only
[[maybe_unused]] const auto deleted_slots = ToSlotSet(migration->GetSlotRange());
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetListeners(), cntx->conn(), ClientPause::WRITE, is_pause_in_progress);
if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
return cntx->SendError("Blocking connections time out");
}
absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
});
// TODO do next after ACK
util::ThisFiber::SleepFor(500ms);
auto cb = [this, &migration](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
// TODO add error processing to move back into STABLE_SYNC state
migration->Finalize(shard->shard_id());
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
// TODO do next after ACK
util::ThisFiber::SleepFor(200ms);
shard_set->pool()->AwaitFiberOnAll([&migration, &deleted_slots](auto*) mutable {
tl_cluster_config->RemoveSlots(deleted_slots);
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Cancel(shard->shard_id());
});

View file

@ -47,11 +47,7 @@ OutgoingMigration::~OutgoingMigration() = default;
void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
SlotSet sset;
for (const auto& slot_range : slots_) {
for (auto i = slot_range.start; i <= slot_range.end; ++i)
sset.insert(i);
}
SlotSet sset = ToSlotSet(slots_);
const auto shard_id = slice->shard_id();

View file

@ -33,10 +33,15 @@ class OutgoingMigration {
const std::string& GetHostIp() const {
return host_ip_;
};
uint16_t GetPort() const {
return port_;
};
const std::vector<ClusterConfig::SlotRange>& GetSlotRange() const {
return slots_;
}
private:
MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard

View file

@ -424,7 +424,7 @@ void ClientPauseCmd(CmdArgList args, absl::Span<facade::Listener*> listeners,
CmdArgParser parser(args);
auto timeout = parser.Next<uint64_t>();
enum ClientPause pause_state = ClientPause::ALL;
ClientPause pause_state = ClientPause::ALL;
if (parser.HasNext()) {
pause_state = parser.ToUpper().Switch("WRITE", ClientPause::WRITE, "ALL", ClientPause::ALL);
}
@ -432,53 +432,20 @@ void ClientPauseCmd(CmdArgList args, absl::Span<facade::Listener*> listeners,
return cntx->SendError(err->MakeReply());
}
// Set global pause state and track commands that are running when the pause state is flipped.
// Exlude already paused commands from the busy count.
DispatchTracker tracker{listeners, cntx->conn(), true /* ignore paused commands */};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});
const auto timeout_ms = timeout * 1ms;
auto is_pause_in_progress = [end_time = chrono::steady_clock::now() + timeout_ms] {
return ServerState::tlocal()->gstate() != GlobalState::SHUTTING_DOWN &&
chrono::steady_clock::now() < end_time;
};
// TODO handle blocking commands
// Wait for all busy commands to finish running before replying to guarantee
// that no more (write) operations will occur.
const absl::Duration kDispatchTimeout = absl::Seconds(1);
if (!tracker.Wait(kDispatchTimeout)) {
LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout;
shard_set->pool()->Await([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
return cntx->SendError("Failed to pause all running clients");
if (auto pause_fb_opt =
Pause(listeners, cntx->conn(), pause_state, std::move(is_pause_in_progress));
pause_fb_opt) {
pause_fb_opt->Detach();
cntx->SendOk();
} else {
cntx->SendError("Failed to pause all running clients");
}
// We should not expire/evict keys while clients are puased.
shard_set->RunBriefInParallel(
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(false); });
fb2::Fiber("client_pause", [timeout, pause_state]() mutable {
// On server shutdown we sleep 10ms to make sure all running task finish, therefore 10ms steps
// ensure this fiber will not left hanging .
auto step = 10ms;
auto timeout_ms = timeout * 1ms;
int64_t steps = timeout_ms.count() / step.count();
ServerState& etl = *ServerState::tlocal();
do {
ThisFiber::SleepFor(step);
} while (etl.gstate() != GlobalState::SHUTTING_DOWN && --steps > 0);
if (etl.gstate() != GlobalState::SHUTTING_DOWN) {
shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
shard_set->RunBriefInParallel(
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(true); });
}
}).Detach();
cntx->SendOk();
}
void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
@ -586,6 +553,54 @@ string_view GetRedisMode() {
} // namespace
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
// Set global pause state and track commands that are running when the pause state is flipped.
// Exlude already paused commands from the busy count.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});
// TODO handle blocking commands
// Wait for all busy commands to finish running before replying to guarantee
// that no more (write) operations will occur.
const absl::Duration kDispatchTimeout = absl::Seconds(1);
if (!tracker.Wait(kDispatchTimeout)) {
LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout;
shard_set->pool()->Await([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
return std::nullopt;
}
// We should not expire/evict keys while clients are puased.
shard_set->RunBriefInParallel(
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(false); });
return fb2::Fiber("client_pause", [is_pause_in_progress, pause_state]() mutable {
// On server shutdown we sleep 10ms to make sure all running task finish, therefore 10ms steps
// ensure this fiber will not left hanging .
constexpr auto step = 10ms;
while (is_pause_in_progress()) {
ThisFiber::SleepFor(step);
}
ServerState& etl = *ServerState::tlocal();
if (etl.gstate() != GlobalState::SHUTTING_DOWN) {
shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
shard_set->RunBriefInParallel(
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(true); });
}
});
}
ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL);
last_save_info_.save_time = start_time_;

View file

@ -302,4 +302,9 @@ class ServerFamily {
mutable PeakStats peak_stats_;
};
// Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress
std::optional<util::fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress);
} // namespace dfly