1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster): add migration finalization (#2507)

* feat(cluster): add migration finalization
This commit is contained in:
Borys 2024-02-01 17:24:54 +02:00 committed by GitHub
parent adeac6bd27
commit 5189dae118
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 174 additions and 25 deletions

View file

@ -23,7 +23,13 @@ using SlotId = uint16_t;
using SlotSet = absl::flat_hash_set<SlotId>;
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC };
enum class MigrationState : uint8_t {
C_NO_STATE,
C_CONNECTING,
C_FULL_SYNC,
C_STABLE_SYNC,
C_FINISHED
};
class ClusterConfig {
public:

View file

@ -403,7 +403,9 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "START-SLOT-MIGRATION") {
return DflyClusterStartSlotMigration(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflySlotMigrationStatus(args, cntx);
return DflyClusterSlotMigrationStatus(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-FINALIZE") {
return DflyClusterMigrationFinalize(args, cntx);
}
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
@ -531,6 +533,9 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
tracker.TrackOnThread();
};
// TODO think about another place for it
RemoveFinishedIncomingMigrations();
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);
@ -629,7 +634,7 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon
}
node->Start(cntx);
return cntx->SendOk();
return cntx->SendLong(node->GetSyncId());
}
static std::string_view state_to_str(MigrationState state) {
@ -642,12 +647,14 @@ static std::string_view state_to_str(MigrationState state) {
return "FULL_SYNC"sv;
case MigrationState::C_STABLE_SYNC:
return "STABLE_SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
}
void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser(args);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
@ -689,6 +696,40 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE));
}
void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto sync_id = parser.Next<uint32_t>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}
auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);
if (migration->GetState() != MigrationState::C_STABLE_SYNC) {
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Finalize(shard->shard_id());
});
// TODO do next after ACK
util::ThisFiber::SleepFor(500ms);
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Cancel(shard->shard_id());
});
RemoveOutgoingMigration(sync_id);
return cntx->SendOk();
}
void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
@ -718,6 +759,19 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t
.get();
}
void ClusterFamily::RemoveFinishedIncomingMigrations() {
lock_guard lk(migration_mu_);
auto removed_items_it =
std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[](const auto& m) { return m->GetState() == MigrationState::C_FINISHED; });
incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end());
}
void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) {
lock_guard lk(migration_mu_);
outgoing_migration_jobs_.erase(sync_id);
}
void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
CmdArgParser parser{args};
@ -793,6 +847,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(kIdNotFound);
cntx->conn()->Migrate(shard_set->pool()->at(shard_id));
server_family_->journal()->StartInThread();
cntx->SendOk();
@ -825,7 +880,6 @@ void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* c
(*migration_it)->SetStableSyncForFlow(shard_id);
if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) {
(*migration_it)->Stop();
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}

View file

@ -51,7 +51,8 @@ class ClusterFamily {
private: // Slots migration section
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
@ -74,6 +75,9 @@ class ClusterFamily {
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
void RemoveFinishedIncomingMigrations();
void RemoveOutgoingMigration(uint32_t sync_id);
// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);

View file

@ -74,10 +74,14 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) {
TouchIoTime();
if (!tx_data->is_ping) {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
} else {
if (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Flow " << source_shard_id_ << " is finalized";
is_finalized_ = true;
break;
} else if (tx_data->opcode == journal::Op::PING) {
// TODO check about ping logic
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
}
}

View file

@ -31,9 +31,14 @@ class ClusterShardMigration : public ProtocolClient {
return is_stable_sync_.load();
}
bool IsFinalized() {
return is_finalized_;
}
void JoinFlow();
private:
void FullSyncShardFb(Context* cntx);
void JoinFlow();
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
@ -45,6 +50,7 @@ class ClusterShardMigration : public ProtocolClient {
std::unique_ptr<JournalExecutor> executor_;
Fiber sync_fb_;
std::atomic_bool is_stable_sync_ = false;
bool is_finalized_ = false;
};
} // namespace dfly

View file

@ -113,6 +113,11 @@ void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) {
}
}
bool ClusterSlotMigration::IsFinalized() const {
return std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsFinalized(); });
}
void ClusterSlotMigration::Stop() {
for (auto& flow : shard_flows_) {
flow->Cancel();
@ -129,6 +134,10 @@ void ClusterSlotMigration::MainMigrationFb() {
LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " "
<< ec.message();
}
if (IsFinalized()) {
state_ = MigrationState::C_FINISHED;
}
}
std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
@ -137,6 +146,13 @@ std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_));
}
absl::Cleanup cleanup = [this]() {
// We do the following operations regardless of outcome.
for (auto& flow : shard_flows_) {
flow->JoinFlow();
}
};
// Switch to new error handler that closes flow sockets.
auto err_handler = [this](const auto& ge) mutable {
// Make sure the flows are not in a state transition

View file

@ -36,6 +36,7 @@ class ClusterSlotMigration : ProtocolClient {
}
void SetStableSyncForFlow(uint32_t flow);
void Stop();
private:
@ -45,6 +46,9 @@ class ClusterSlotMigration : ProtocolClient {
// Creates flows, one per shard on the source node and manage migration process
std::error_code InitiateSlotsMigration();
// may be called after we finish all flows
bool IsFinalized() const;
private:
Service& service_;
Mutex flows_op_mu_;

View file

@ -18,10 +18,14 @@ class OutgoingMigration::SliceSlotMigration {
state_ = MigrationState::C_FULL_SYNC;
}
~SliceSlotMigration() {
void Cancel() {
streamer_.Cancel();
}
void Finalize() {
streamer_.SendFinalize();
}
MigrationState GetState() const {
return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
? MigrationState::C_STABLE_SYNC
@ -56,8 +60,20 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_, dest);
}
void OutgoingMigration::Finalize(uint32_t shard_id) {
slot_migrations_[shard_id]->Finalize();
}
void OutgoingMigration::Cancel(uint32_t shard_id) {
slot_migrations_[shard_id]->Cancel();
}
MigrationState OutgoingMigration::GetState() const {
std::lock_guard lck(flows_mu_);
return GetStateImpl();
}
MigrationState OutgoingMigration::GetStateImpl() const {
MigrationState min_state = MigrationState::C_STABLE_SYNC;
for (const auto& slot_migration : slot_migrations_) {
if (slot_migration)

View file

@ -15,7 +15,7 @@ class Journal;
class DbSlice;
// Whole slots migration process information
// Whole outgoing slots migration manager
class OutgoingMigration {
public:
OutgoingMigration() = default;
@ -25,6 +25,9 @@ class OutgoingMigration {
void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);
MigrationState GetState() const;
const std::string& GetHostIp() const {
@ -35,6 +38,7 @@ class OutgoingMigration {
};
private:
MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;

View file

@ -195,7 +195,7 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
entry.dbid = dbid_;
entry.opcode = opcode;
if (opcode == journal::Op::PING) {
if (opcode == journal::Op::PING || opcode == journal::Op::FIN) {
return entry;
}

View file

@ -92,6 +92,15 @@ void RestoreStreamer::Start(io::Sink* dest) {
});
}
void RestoreStreamer::SendFinalize() {
VLOG(2) << "DFLYMIGRATE FINALIZE for " << sync_id_ << " : " << db_slice_->shard_id();
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
JournalWriter writer{this};
writer.Write(entry);
NotifyWritten(true);
}
void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();

View file

@ -60,6 +60,8 @@ class RestoreStreamer : public JournalStreamer {
// Cancel() must be called if Start() is called
void Cancel() override;
void SendFinalize();
bool IsSnapshotFinished() const {
return snapshot_finished_;
}

View file

@ -49,10 +49,12 @@ void MultiShardExecution::CancelAllBlockingEntities() {
bool TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
opcode = entry.opcode;
switch (entry.opcode) {
case journal::Op::PING:
is_ping = true;
return true;
case journal::Op::FIN:
return true;
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
@ -112,7 +114,7 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING)
res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN)
return TransactionData::FromSingle(std::move(res.value()));
// Otherwise, continue building multi command.

View file

@ -51,7 +51,7 @@ struct TransactionData {
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
bool is_ping = false; // For Op::PING entries.
journal::Op opcode = journal::Op::NOOP;
};
// Utility for reading TransactionData from a journal reader.

View file

@ -22,6 +22,7 @@ enum class Op : uint8_t {
MULTI_COMMAND = 11,
EXEC = 12,
PING = 13,
FIN = 14
};
struct EntryBase {

View file

@ -803,7 +803,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
last_io_time_ = Proactor()->GetMonotonicTimeNs();
if (!tx_data->is_ping) {
if (tx_data->opcode != journal::Op::PING) {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {

View file

@ -307,8 +307,10 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
unique_shard_cnt_ = 1;
if (is_stub) // stub transactions don't migrate
DCHECK_EQ(unique_shard_id_, Shard(kv_args_.front(), shard_set->size()));
else
else {
unique_slot_checker_.Add(kv_args_.front());
unique_shard_id_ = Shard(kv_args_.front(), shard_set->size());
}
// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
// array, as it still might be read by leftover callbacks.

View file

@ -787,7 +787,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259"
)
assert "OK" == res
assert 1 == res
while (
await c_nodes_admin[1].execute_command(
@ -862,12 +862,9 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")
assert await c_nodes[1].set("KEY2", "value")
assert await c_nodes[1].set("KEY3", "value")
assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
assert await c_nodes[1].set("KEY6", "value")
assert await c_nodes[1].set("KEY7", "value")
assert await c_nodes[0].set("KEY8", "value")
@ -882,12 +879,14 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[0].set("KEY17", "value")
assert await c_nodes[1].set("KEY18", "value")
assert await c_nodes[1].set("KEY19", "value")
assert await c_nodes[0].execute_command("DBSIZE") == 10
res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000"
)
assert "OK" == res
assert 1 == res
assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")
while (
await c_nodes_admin[1].execute_command(
@ -897,6 +896,26 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
):
await asyncio.sleep(0.05)
assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
assert await c_nodes[0].execute_command("DBSIZE") == 10
# TODO remove when we add slot blocking
await asyncio.sleep(0.5)
res = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", "1")
assert "OK" == res
await asyncio.sleep(0.5)
while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "FINISHED"
):
await asyncio.sleep(0.05)
await push_config(
config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"),
c_nodes_admin,