1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Update dflycluster slot-migration-status reply (#3707)

* feat: update DFLYCLUSTER SLOT-MIGRATION-STATUS reply
This commit is contained in:
Borys 2024-09-15 09:44:40 +03:00 committed by GitHub
parent b5929f0162
commit 93de559977
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 42 additions and 42 deletions

View file

@ -114,7 +114,6 @@ using ClusterShardInfos = std::vector<ClusterShardInfo>;
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
C_NO_STATE,
C_CONNECTING,
C_SYNC,
C_ERROR,

View file

@ -679,8 +679,6 @@ void ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations) {
static string_view StateToStr(MigrationState state) {
switch (state) {
case MigrationState::C_NO_STATE:
return "NO_STATE"sv;
case MigrationState::C_CONNECTING:
return "CONNECTING"sv;
case MigrationState::C_SYNC:
@ -708,15 +706,22 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
}
}
vector<string> reply;
struct Reply {
string_view direction;
string node_id;
string_view state;
size_t keys_number;
string error;
};
vector<Reply> reply;
reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size());
auto append_answer = [&reply](string_view direction, string_view node_id, string_view filter,
MigrationState state, size_t keys_number, string_view error) {
auto append_answer = [&reply](string_view direction, string node_id, string_view filter,
MigrationState state, size_t keys_number, string error) {
if (filter.empty() || filter == node_id) {
error = error.empty() ? "0" : error;
reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state),
" keys:", keys_number, " errors:", error));
reply.emplace_back(
Reply{direction, std::move(node_id), StateToStr(state), keys_number, std::move(error)});
}
};
@ -730,10 +735,14 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
m->GetKeyCount(), m->GetErrorStr());
}
if (reply.empty()) {
rb->SendSimpleString(StateToStr(MigrationState::C_NO_STATE));
} else {
rb->SendStringArr(reply);
rb->StartArray(reply.size());
for (const auto& r : reply) {
rb->StartArray(5);
rb->SendBulkString(r.direction);
rb->SendBulkString(r.node_id);
rb->SendBulkString(r.state);
rb->SendLong(r.keys_number);
rb->SendBulkString(r.error);
}
}

View file

@ -64,7 +64,7 @@ class IncomingSlotMigration {
Service& service_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
std::atomic<MigrationState> state_ = MigrationState::C_NO_STATE;
std::atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
Context cntx_;
mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);

View file

@ -142,7 +142,6 @@ void OutgoingMigration::Finish(bool is_error) {
case MigrationState::C_FINISHED:
return; // Already finished, nothing else to do
case MigrationState::C_NO_STATE:
case MigrationState::C_CONNECTING:
should_cancel_flows = false;
break;

View file

@ -90,7 +90,7 @@ class OutgoingMigration : private ProtocolClient {
util::fb2::Fiber main_sync_fb_;
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_CONNECTING;
boost::intrusive_ptr<Transaction> tx_;

View file

@ -153,16 +153,14 @@ async def wait_for_status(admin_client, node_id, status, timeout=10):
)
async for states, breaker in tick_timer(get_status, timeout=timeout):
if type(states) != list:
states = [states]
with breaker:
assert all(status in state for state in states), states
assert len(states) != 0 and all(status == state[2] for state in states), states
async def check_for_no_state_status(admin_clients):
for client in admin_clients:
state = await client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
if state != "NO_STATE":
if len(state) != 0:
logging.debug(f"SLOT-MIGRATION-STATUS is {state}, instead of NO_STATE")
assert False
@ -1059,7 +1057,7 @@ async def test_config_consistency(df_factory: DflyInstanceFactory):
await asyncio.sleep(0.2)
await wait_for_status(nodes[0].admin_client, nodes[1].id, "CONNECTING")
await wait_for_status(nodes[1].admin_client, nodes[0].id, "NO_STATE")
await check_for_no_state_status([nodes[1].admin_client])
logging.debug("Push migration config to target node")
await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client])
@ -1113,13 +1111,11 @@ async def test_cluster_flushall_during_migration(
await nodes[0].client.execute_command("flushall")
status1 = await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
assert (
"FINISHED"
not in (
await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
)[0]
len(status1) == 0 or "FINISHED" not in status1[0]
), "Weak test case - finished migration too early"
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
@ -1193,16 +1189,17 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
key = "KEY" + str(i)
assert await nodes[0 if (key_slot(key) // 3000) == 0 else 1].client.set(key, "value")
assert (
await nodes[0].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
)
)[0].startswith(f"out {nodes[1].id} FINISHED keys:7")
assert (
await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
)[0].startswith(f"in {nodes[0].id} FINISHED keys:7")
status = await nodes[0].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
)
status[0].pop()
assert status[0] == ["out", nodes[1].id, "FINISHED", 7]
status = await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
status[0].pop()
assert status[0] == ["in", nodes[0].id, "FINISHED", 7]
nodes[0].migrations = []
nodes[0].slots = [(0, 2999)]
@ -1418,13 +1415,9 @@ async def test_cluster_fuzzymigration(
res = True
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
if states != "NO_STATE":
logging.debug(states)
logging.debug(states)
for state in states:
parsed_state = re.search("([a-z]+) ([a-z0-9]+) ([A-Z]+)", state)
if parsed_state == None:
continue
direction, node_id, st = parsed_state.group(1, 2, 3)
direction, node_id, st, _, _ = state
if direction == "out":
if st == "FINISHED":
m_id = [id for id, x in enumerate(node.migrations) if x.node_id == node_id][