1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster): Send number of keys for incoming and outgoing migrations. (#2858)

The number of keys in an _incoming_ migration indicates how many keys
were received, while for _outgoing_ it shows the total number. Combining
the two can provide the control plane with percentage.

This slightly modified the format of the response.

Fixes #2756
This commit is contained in:
Shahar Mike 2024-04-08 21:17:03 +03:00 committed by GitHub
parent 2e00d42fa6
commit b8693b4805
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 62 additions and 40 deletions

View file

@ -406,7 +406,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflyIncomingSlotMigrationStatus(args, cntx);
return DflySlotMigrationStatus(args, cntx);
}
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
@ -609,7 +609,7 @@ bool ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations,
return true;
}
static std::string_view state_to_str(MigrationState state) {
static string_view StateToStr(MigrationState state) {
switch (state) {
case MigrationState::C_NO_STATE:
return "NO_STATE"sv;
@ -626,44 +626,67 @@ static std::string_view state_to_str(MigrationState state) {
return "UNDEFINED_STATE"sv;
}
void ClusterFamily::DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser(args);
static uint64_t GetKeyCount(const SlotRanges& slots) {
atomic_uint64_t keys = 0;
shard_set->pool()->Await([&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
uint64_t shard_keys = 0;
for (const SlotRange& range : slots) {
for (SlotId slot = range.start; slot <= range.end; slot++) {
shard_keys += shard->db_slice().GetSlotStats(slot).key_count;
}
}
keys.fetch_add(shard_keys);
});
return keys.load();
}
void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
CmdArgParser parser(args);
lock_guard lk(migration_mu_);
string_view node_id;
if (parser.HasNext()) {
auto node_id = parser.Next<std::string_view>();
if (auto err = parser.Error(); err)
node_id = parser.Next<std::string_view>();
if (auto err = parser.Error(); err) {
return rb->SendError(err->MakeReply());
lock_guard lk(migration_mu_);
// find incoming slot migration
for (const auto& m : incoming_migrations_jobs_) {
if (m->GetSourceID() == node_id)
return rb->SendSimpleString(state_to_str(m->GetState()));
}
// find outgoing slot migration
for (const auto& migration : outgoing_migration_jobs_) {
if (migration->GetMigrationInfo().node_id == node_id)
return rb->SendSimpleString(state_to_str(migration->GetState()));
}
} else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size();
arr_size != 0) {
rb->StartArray(arr_size);
const auto& send_answer = [rb](std::string_view direction, std::string_view node_id,
auto state) {
auto str = absl::StrCat(direction, " ", node_id, " ", state_to_str(state));
rb->SendSimpleString(str);
};
lock_guard lk(migration_mu_);
for (const auto& m : incoming_migrations_jobs_) {
send_answer("in", m->GetSourceID(), m->GetState());
}
for (const auto& migration : outgoing_migration_jobs_) {
send_answer("out", migration->GetMigrationInfo().node_id, migration->GetState());
}
return;
}
return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE));
vector<string> reply;
reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size());
auto append_answer = [rb, &reply](string_view direction, string_view node_id, string_view filter,
MigrationState state, const SlotRanges& slots) {
if (filter.empty() || filter == node_id) {
reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state), " ",
"keys:", GetKeyCount(slots)));
}
};
for (const auto& m : incoming_migrations_jobs_) {
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetSlots());
}
for (const auto& migration : outgoing_migration_jobs_) {
append_answer("out", migration->GetMigrationInfo().node_id, node_id, migration->GetState(),
migration->GetSlots());
}
if (reply.empty()) {
rb->SendSimpleString(StateToStr(MigrationState::C_NO_STATE));
} else if (!node_id.empty()) {
DCHECK_EQ(reply.size(), 1UL);
rb->SendSimpleString(reply[0]);
} else {
rb->SendStringArr(reply);
}
}
void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {

View file

@ -56,7 +56,7 @@ class ClusterFamily {
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
private: // Slots migration section
void DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);

View file

@ -1042,9 +1042,8 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
await asyncio.sleep(0.5)
while (
await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0])
!= "FINISHED"
while "FINISHED" not in await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
):
await asyncio.sleep(0.05)
@ -1245,7 +1244,7 @@ async def test_cluster_fuzzymigration(
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
print(states)
if not all(s.endswith("FINISHED") for s in states) and not states == "NO_STATE":
if not all("FINISHED" in s for s in states) and not states == "NO_STATE":
break
else:
break

View file

@ -343,7 +343,7 @@ def migrate(args):
while True:
sync_status = send_command(target_node, ["DFLYCLUSTER", "SLOT-MIGRATION-STATUS"])
assert len(sync_status) == 1
if sync_status[0].endswith("STABLE_SYNC"):
if "STABLE_SYNC" in sync_status[0]:
break
print("Reached stable sync: ", sync_status)