From 84814a735831cbdddd5c662249dd0e114d3a61b7 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 2 Jul 2024 14:23:54 +0300 Subject: [PATCH] fix: fix move error during migration finalization (#3253) * fix: fix Move error during migration finalization --- src/server/cluster/cluster_config.cc | 30 ++++--- src/server/cluster/cluster_config.h | 2 +- src/server/cluster/cluster_config_test.cc | 40 ++++----- src/server/cluster/cluster_defs.cc | 30 +++++++ src/server/cluster/cluster_defs.h | 83 ++++++++++++++----- src/server/cluster/cluster_family.cc | 74 +++++++++-------- src/server/cluster/incoming_slot_migration.cc | 5 +- src/server/cluster/outgoing_slot_migration.cc | 18 ++-- src/server/cluster/slot_set.h | 4 +- tests/dragonfly/cluster_test.py | 10 +-- 10 files changed, 185 insertions(+), 111 deletions(-) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 0be5c4727..1719adaf7 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -90,6 +90,7 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, shared_ptr result(new ClusterConfig()); + result->my_id_ = my_id; result->config_ = config; for (const auto& shard : result->config_) { @@ -101,10 +102,10 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, result->my_outgoing_migrations_ = shard.migrations; } else { for (const auto& m : shard.migrations) { - if (my_id == m.node_id) { + if (my_id == m.node_info.id) { auto incoming_migration = m; // for incoming migration we need the source node - incoming_migration.node_id = shard.master.id; + incoming_migration.node_info.id = shard.master.id; result->my_incoming_migrations_.push_back(std::move(incoming_migration)); } } @@ -132,7 +133,7 @@ optional GetClusterSlotRanges(const JsonType& slots) { return nullopt; } - SlotRanges ranges; + std::vector ranges; for (const auto& range : slots.array_range()) { if (!range.is_object()) { @@ -149,7 +150,7 @@ optional GetClusterSlotRanges(const JsonType& slots) { ranges.push_back({.start = start.value(), .end = end.value()}); } - return ranges; + return SlotRanges(ranges); } optional ParseClusterNode(const JsonType& json) { @@ -211,10 +212,10 @@ optional> ParseMigrations(const JsonType& json) { return nullopt; } - res.emplace_back(MigrationInfo{.slot_ranges = std::move(*slots), - .node_id = node_id.as_string(), - .ip = ip.as_string(), - .port = *port}); + res.emplace_back(MigrationInfo{ + .slot_ranges = std::move(*slots), + .node_info = + ClusterNodeInfo{.id = node_id.as_string(), .ip = ip.as_string(), .port = *port}}); } return res; } @@ -316,10 +317,17 @@ ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const { CHECK_LE(id, cluster::kMaxSlotNum) << "Requesting a non-existing slot id " << id; for (const auto& shard : config_) { - for (const auto& range : shard.slot_ranges) { - if (id >= range.start && id <= range.end) { - return shard.master; + if (shard.slot_ranges.Contains(id)) { + if (shard.master.id == my_id_) { + // The only reason why this function call and shard.master == my_id_ is the slot was + // migrated + for (const auto& m : shard.migrations) { + if (m.slot_ranges.Contains(id)) { + return m.node_info; + } + } } + return shard.master; } } diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 4099852c5..d00ee9d47 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -9,7 +9,6 @@ #include #include "src/server/cluster/slot_set.h" -#include "src/server/common.h" namespace dfly::cluster { @@ -59,6 +58,7 @@ class ClusterConfig { ClusterConfig() = default; + std::string my_id_; ClusterShardInfos config_; SlotSet my_slots_; diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index 5f72fb4d3..ed6c65bd9 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -96,7 +96,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) { TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) { EXPECT_EQ(ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 16000}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}), .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, .replicas = {}, .migrations = {}}}), @@ -105,11 +105,11 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) { TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) { EXPECT_EQ(ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}), .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, .replicas = {}, .migrations = {}}, - {.slot_ranges = {{.start = 0, .end = 0}}, + {.slot_ranges = SlotRanges({{.start = 0, .end = 0}}), .master = {.id = "other2", .ip = "192.168.0.101", .port = 7001}, .replicas = {}, .migrations = {}}}), @@ -118,7 +118,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) { TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) { EXPECT_EQ(ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF + 1}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}), .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, .replicas = {}, .migrations = {}}}), @@ -127,7 +127,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) { TEST_F(ClusterConfigTest, ConfigSetOk) { auto config = ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}), .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, .replicas = {}, .migrations = {}}}); @@ -139,7 +139,7 @@ TEST_F(ClusterConfigTest, ConfigSetOk) { TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) { auto config = ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}), .master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000}, .replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}, .migrations = {}}}); @@ -150,21 +150,21 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) { TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { auto config = ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = {{.start = 0, .end = 5'000}}, + kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}), .master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000}, .replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}, .migrations = {}}, - {.slot_ranges = {{.start = 5'001, .end = 10'000}}, + {.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}), .master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002}, .replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}, .migrations = {}}, - {.slot_ranges = {{.start = 10'001, .end = 0x3FFF}}, + {.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}), .master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004}, .replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}, .migrations = {}}}); EXPECT_NE(config, nullptr); SlotSet owned_slots = config->GetOwnedSlots(); - EXPECT_EQ(owned_slots.ToSlotRanges().size(), 1); + EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1); EXPECT_EQ(owned_slots.Count(), 5'000); { @@ -481,8 +481,8 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) { auto config1 = ClusterConfig::CreateFromConfig("id0", config_str); EXPECT_EQ( config1->GetNewOutgoingMigrations(nullptr), - (std::vector{ - {.slot_ranges = {{7000, 8000}}, .node_id = "id1", .ip = "127.0.0.1", .port = 9001}})); + (std::vector{{.slot_ranges = SlotRanges({{7000, 8000}}), + .node_info = {.id = "id1", .ip = "127.0.0.1", .port = 9001}}})); EXPECT_TRUE(config1->GetFinishedOutgoingMigrations(nullptr).empty()); EXPECT_TRUE(config1->GetNewIncomingMigrations(nullptr).empty()); @@ -491,8 +491,8 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) { auto config2 = ClusterConfig::CreateFromConfig("id1", config_str); EXPECT_EQ( config2->GetNewIncomingMigrations(nullptr), - (std::vector{ - {.slot_ranges = {{7000, 8000}}, .node_id = "id0", .ip = "127.0.0.1", .port = 9001}})); + (std::vector{{.slot_ranges = SlotRanges({{7000, 8000}}), + .node_info = {.id = "id0", .ip = "127.0.0.1", .port = 9001}}})); EXPECT_TRUE(config2->GetFinishedOutgoingMigrations(nullptr).empty()); EXPECT_TRUE(config2->GetNewOutgoingMigrations(nullptr).empty()); @@ -523,16 +523,16 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) { EXPECT_EQ( config4->GetFinishedOutgoingMigrations(config1), - (std::vector{ - {.slot_ranges = {{7000, 8000}}, .node_id = "id1", .ip = "127.0.0.1", .port = 9001}})); + (std::vector{{.slot_ranges = SlotRanges({{7000, 8000}}), + .node_info = {.id = "id1", .ip = "127.0.0.1", .port = 9001}}})); EXPECT_TRUE(config4->GetNewIncomingMigrations(config1).empty()); EXPECT_TRUE(config4->GetFinishedIncomingMigrations(config1).empty()); EXPECT_TRUE(config4->GetNewOutgoingMigrations(config1).empty()); EXPECT_EQ( config5->GetFinishedIncomingMigrations(config2), - (std::vector{ - {.slot_ranges = {{7000, 8000}}, .node_id = "id0", .ip = "127.0.0.1", .port = 9001}})); + (std::vector{{.slot_ranges = SlotRanges({{7000, 8000}}), + .node_info = {.id = "id0", .ip = "127.0.0.1", .port = 9001}}})); EXPECT_TRUE(config5->GetNewIncomingMigrations(config2).empty()); EXPECT_TRUE(config5->GetFinishedOutgoingMigrations(config2).empty()); EXPECT_TRUE(config5->GetNewOutgoingMigrations(config2).empty()); @@ -589,7 +589,7 @@ TEST_F(ClusterConfigTest, SlotSetAPI) { ss.Set(5010, true); EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5010, 5010}})); - ss.Set({SlotRange{5000, 5100}}, true); + ss.Set(SlotRanges({{5000, 5100}}), true); EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5100}})); ss.Set(5050, false); @@ -598,7 +598,7 @@ TEST_F(ClusterConfigTest, SlotSetAPI) { ss.Set(5500, false); EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}})); - ss.Set({SlotRange{5090, 5100}}, false); + ss.Set(SlotRanges({{5090, 5100}}), false); EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5089}})); SlotSet ss1(SlotRanges({{1001, 2000}})); diff --git a/src/server/cluster/cluster_defs.cc b/src/server/cluster/cluster_defs.cc index 473ae911a..e93284994 100644 --- a/src/server/cluster/cluster_defs.cc +++ b/src/server/cluster/cluster_defs.cc @@ -3,9 +3,13 @@ extern "C" { #include "redis/crc16.h" } +#include +#include + #include "base/flags.h" #include "base/logging.h" #include "cluster_defs.h" +#include "slot_set.h" #include "src/server/common.h" using namespace std; @@ -15,6 +19,32 @@ ABSL_FLAG(string, cluster_mode, "", "'emulated', 'yes' or ''"); namespace dfly::cluster { +std::string SlotRange::ToString() const { + return absl::StrCat("[", start, ", ", end, "]"); +} + +SlotRanges::SlotRanges(std::vector ranges) : ranges_(std::move(ranges)) { + std::sort(ranges_.begin(), ranges_.end()); +} + +void SlotRanges::Merge(const SlotRanges& sr) { + // TODO rewrite it + SlotSet slots(*this); + slots.Set(sr, true); + ranges_ = std::move(slots.ToSlotRanges().ranges_); +} + +std::string SlotRanges::ToString() const { + return absl::StrJoin(ranges_, ", ", [](std::string* out, SlotRange range) { + absl::StrAppend(out, range.ToString()); + }); +} + +std::string MigrationInfo::ToString() const { + return absl::StrCat(node_info.id, ",", node_info.ip, ":", node_info.port, " (", + slot_ranges.ToString(), ")"); +} + namespace { enum class ClusterMode { kUninitialized, diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 1206a340a..9576a4b6c 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -4,10 +4,7 @@ #pragma once -#include -#include - -#include +#include #include #include #include @@ -24,45 +21,85 @@ struct SlotRange { SlotId start = 0; SlotId end = 0; - bool operator==(const SlotRange& r) const { + bool operator==(const SlotRange& r) const noexcept { return start == r.start && end == r.end; } - bool IsValid() { + + bool operator<(const SlotRange& r) const noexcept { + return start < r.start || (start == r.start && end < r.end); + } + + bool IsValid() const noexcept { return start <= end && start <= kMaxSlotId && end <= kMaxSlotId; } - std::string ToString() const { - return absl::StrCat("[", start, ", ", end, "]"); + bool Contains(SlotId id) const noexcept { + return id >= start && id <= end; } - static std::string ToString(const std::vector& ranges) { - return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) { - absl::StrAppend(out, range.ToString()); - }); - } + std::string ToString() const; }; -using SlotRanges = std::vector; +class SlotRanges { + public: + SlotRanges() = default; + explicit SlotRanges(std::vector ranges); + + bool Contains(SlotId id) const noexcept { + for (const auto& sr : ranges_) { + if (sr.Contains(id)) + return true; + } + return false; + } + + size_t Size() const noexcept { + return ranges_.size(); + } + + bool Empty() const noexcept { + return ranges_.empty(); + } + + void Merge(const SlotRanges& sr); + + bool operator==(const SlotRanges& r) const noexcept { + return ranges_ == r.ranges_; + } + + std::string ToString() const; + + auto begin() const noexcept { + return ranges_.cbegin(); + } + + auto end() const noexcept { + return ranges_.cend(); + } + + private: + std::vector ranges_; +}; struct ClusterNodeInfo { std::string id; std::string ip; uint16_t port = 0; + + bool operator==(const ClusterNodeInfo& r) const noexcept { + return port == r.port && ip == r.ip && id == r.id; + } }; struct MigrationInfo { - std::vector slot_ranges; - std::string node_id; - std::string ip; - uint16_t port = 0; + SlotRanges slot_ranges; + ClusterNodeInfo node_info; - bool operator==(const MigrationInfo& r) const { - return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && node_id == r.node_id; + bool operator==(const MigrationInfo& r) const noexcept { + return node_info == r.node_info && slot_ranges == r.slot_ranges; } - std::string ToString() const { - return absl::StrCat(node_id, ",", ip, ":", port, " (", SlotRange::ToString(slot_ranges), ")"); - } + std::string ToString() const; }; struct ClusterShardInfo { diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 57b5eb8ff..e7ba2f4e0 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -79,7 +79,7 @@ ClusterConfig* ClusterFamily::cluster_config() { } ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { - ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = kMaxSlotNum}}, + ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}), .master = {}, .replicas = {}, .migrations = {}}; @@ -160,7 +160,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) rb->StartArray(kEntrySize); rb->SendBulkString("slots"); - rb->StartArray(shard.slot_ranges.size() * 2); + rb->StartArray(shard.slot_ranges.Size() * 2); for (const auto& slot_range : shard.slot_ranges) { rb->SendLong(slot_range.start); rb->SendLong(slot_range.end); @@ -200,7 +200,7 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) unsigned int slot_ranges = 0; for (const auto& shard : config) { - slot_ranges += shard.slot_ranges.size(); + slot_ranges += shard.slot_ranges.Size(); } rb->StartArray(slot_ranges); @@ -237,7 +237,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, Connec string result; auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id, - const vector& ranges) { + const SlotRanges& ranges) { absl::StrAppend(&result, node.id, " "); absl::StrAppend(&result, node.ip, ":", node.port, "@", node.port, " "); @@ -312,7 +312,7 @@ void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { known_nodes += 1; // For master known_nodes += shard_config.replicas.size(); - if (!shard_config.slot_ranges.empty()) { + if (!shard_config.slot_ranges.Empty()) { ++cluster_size; } } @@ -436,7 +436,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) { namespace { void DeleteSlots(const SlotRanges& slots_ranges) { - if (slots_ranges.empty()) { + if (slots_ranges.Empty()) { return; } @@ -451,13 +451,13 @@ void DeleteSlots(const SlotRanges& slots_ranges) { } void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) { - if (slot_ranges.empty()) { + if (slot_ranges.Empty()) { return; } // Build args vector args; - args.reserve(slot_ranges.size() + 1); + args.reserve(slot_ranges.Size() + 1); args.push_back("FLUSHSLOTS"); for (SlotRange range : slot_ranges) { args.push_back(absl::StrCat(range.start)); @@ -517,14 +517,12 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) // set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem for (const auto& m : incoming_migrations_jobs_) { if (m->GetState() == MigrationState::C_FINISHED) { - const auto& slots = m->GetSlots(); - enable_slots.insert(enable_slots.end(), slots.begin(), slots.end()); + enable_slots.Merge(m->GetSlots()); } } for (const auto& m : outgoing_migration_jobs_) { if (m->GetState() == MigrationState::C_FINISHED) { - const auto& slots = m->GetSlots(); - disable_slots.insert(disable_slots.end(), slots.begin(), slots.end()); + disable_slots.Merge(m->GetSlots()); } } } @@ -554,17 +552,16 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) DCHECK(tl_cluster_config != nullptr); if (!tracker.Wait(absl::Seconds(1))) { - LOG(WARNING) << "Cluster config change timed out"; + LOG(WARNING) << "Cluster config change timed for: " << MyID(); } SlotSet after = tl_cluster_config->GetOwnedSlots(); if (ServerState::tlocal()->is_master) { auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges(); - deleted_slots.insert(deleted_slots.end(), out_migrations_slots.begin(), - out_migrations_slots.end()); - LOG_IF(INFO, !deleted_slots.empty()) - << "Flushing newly unowned slots: " << SlotRange::ToString(deleted_slots); + deleted_slots.Merge(out_migrations_slots); DeleteSlots(deleted_slots); + LOG_IF(INFO, !deleted_slots.Empty()) + << "Flushing newly unowned slots: " << deleted_slots.ToString(); WriteFlushSlotsToJournal(deleted_slots); } @@ -619,7 +616,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c } void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) { - SlotRanges slot_ranges; + std::vector slot_ranges; CmdArgParser parser(args); do { @@ -630,7 +627,7 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn if (auto err = parser.Error(); err) return cntx->SendError(err->MakeReply()); - DeleteSlots(slot_ranges); + DeleteSlots(SlotRanges(std::move(slot_ranges))); return cntx->SendOk(); } @@ -692,8 +689,8 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* m->GetErrorStr()); } for (const auto& m : outgoing_migration_jobs_) { - append_answer("out", m->GetMigrationInfo().node_id, node_id, m->GetState(), m->GetKeyCount(), - m->GetErrorStr()); + append_answer("out", m->GetMigrationInfo().node_info.id, node_id, m->GetState(), + m->GetKeyCount(), m->GetErrorStr()); } if (reply.empty()) { @@ -741,14 +738,14 @@ SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr new auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(), [&m](const auto& om) { // we can have only one migration per target-source pair - return m.node_id == om->GetMigrationInfo().node_id; + return m.node_info.id == om->GetMigrationInfo().node_info.id; }); DCHECK(it != outgoing_migration_jobs_.end()); DCHECK(it->get() != nullptr); OutgoingMigration& migration = *it->get(); const auto& slots = migration.GetSlots(); - removed_slots.insert(removed_slots.end(), slots.begin(), slots.end()); - LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(slots) << " to " + removed_slots.Merge(slots); + LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); migration.Finish(); outgoing_migration_jobs_.erase(it); @@ -789,7 +786,7 @@ bool RemoveIncomingMigrationImpl(std::vectorGetState() == MigrationState::C_FINISHED) << "Flushing slots of removed FINISHED migration " << migration->GetSourceID() - << ", slots: " << SlotRange::ToString(removed_ranges); + << ", slots: " << removed_ranges.ToString(); DeleteSlots(removed_ranges); } @@ -800,8 +797,8 @@ bool RemoveIncomingMigrationImpl(std::vector& migrations) { lock_guard lk(migration_mu_); for (const auto& m : migrations) { - RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_id); - VLOG(1) << "Migration was canceled from: " << m.node_id; + RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_info.id); + VLOG(1) << "Migration was canceled from: " << m.node_info.id; } } @@ -811,7 +808,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { auto [source_id, flows_num] = parser.Next(); - SlotRanges slots; + std::vector slots; do { auto [slot_start, slot_end] = parser.Next(); slots.emplace_back(SlotRange{slot_start, slot_end}); @@ -824,7 +821,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(), [&](const MigrationInfo& info) { // TODO: also compare slot ranges (in an order-agnostic way) - return info.node_id == source_id; + return info.node_info.id == source_id; }); if (!found) { VLOG(1) << "Unrecognized incoming migration from " << source_id; @@ -838,7 +835,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id; incoming_migrations_jobs_.emplace_back(make_shared( - std::move(source_id), &server_family_->service(), std::move(slots), flows_num)); + std::move(source_id), &server_family_->service(), SlotRanges(std::move(slots)), flows_num)); return cntx->SendOk(); } @@ -881,8 +878,11 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots, bool is_incoming) { + VLOG(1) << "Update config for slots ranges: " << slots.ToString() << " for " << MyID() << " : " + << node_id; lock_guard gu(set_config_mu); lock_guard lk(migration_mu_); + bool is_migration_valid = false; if (is_incoming) { for (const auto& mj : incoming_migrations_jobs_) { @@ -893,14 +893,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, } } else { for (const auto& mj : outgoing_migration_jobs_) { - if (mj->GetMigrationInfo().node_id == node_id) { + if (mj->GetMigrationInfo().node_info.id == node_id) { // TODO add compare for slots is_migration_valid = true; } } } - if (!is_migration_valid) + if (!is_migration_valid) { + LOG(WARNING) << "Config wasn't updated for slots ranges: " << slots.ToString() << " for " + << MyID() << " : " << node_id; return; + } auto new_config = is_incoming ? tl_cluster_config->CloneWithChanges(slots, {}) : tl_cluster_config->CloneWithChanges({}, slots); @@ -910,6 +913,8 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, server_family_->service().proactor_pool().AwaitFiberOnAll( [&new_config](util::ProactorBase*) { tl_cluster_config = new_config; }); DCHECK(tl_cluster_config != nullptr); + VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID() + << " : " << node_id; } void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { @@ -923,7 +928,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "DFLYMIGRATE ACK" << args; auto in_migrations = tl_cluster_config->GetIncomingMigrations(); auto m_it = std::find_if(in_migrations.begin(), in_migrations.end(), - [source_id](const auto& m) { return m.node_id == source_id; }); + [source_id](const auto& m) { return m.node_info.id == source_id; }); if (m_it == in_migrations.end()) { LOG(WARNING) << "migration isn't in config"; return cntx->SendLong(OutgoingMigration::kInvalidAttempt); @@ -937,10 +942,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError("Join timeout happened"); } - VLOG(1) << "Migration is joined for " << source_id; - ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true); - VLOG(1) << "Config is updated for " << MyID(); return cntx->SendLong(attempt); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 8c9221eda..06938c669 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -144,7 +144,7 @@ bool IncomingSlotMigration::Join() { void IncomingSlotMigration::Stop() { string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling"; - LOG(INFO) << log_state << " incoming migration of slots " << SlotRange::ToString(slots_); + LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString(); cntx_.Cancel(); for (auto& flow : shard_flows_) { @@ -156,11 +156,10 @@ void IncomingSlotMigration::Stop() { } void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) { - VLOG(1) << "Start flow for shard: " << shard; state_.store(MigrationState::C_SYNC); shard_flows_[shard]->Start(&cntx_, source, bc_); - VLOG(1) << "Incoming slot migration flow for shard: " << shard << " finished"; + VLOG(1) << "Incoming flow: " << shard << " finished for " << source_id_; } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 293e0d723..fa3e2c1b2 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -78,7 +78,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { }; OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf) - : ProtocolClient(info.ip, info.port), + : ProtocolClient(info.node_info.ip, info.node_info.port), migration_info_(std::move(info)), slot_migrations_(shard_set->size()), server_family_(sf), @@ -107,6 +107,8 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) { } void OutgoingMigration::Finish(bool is_error) { + VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : " + << migration_info_.node_info.id; bool should_cancel_flows = false; { @@ -217,20 +219,18 @@ void OutgoingMigration::SyncFb() { }); if (CheckFlowsForErrors()) { - VLOG(1) << "Errors detected, retrying outgoing migration"; + LOG(WARNING) << "Errors detected, retrying outgoing migration"; continue; } - VLOG(2) << "Migrations snapshot is finished"; - long attempt = 0; while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { // process commands that were on pause and try again - VLOG(2) << "Waiting for migration to finalize..."; + VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } if (CheckFlowsForErrors()) { - VLOG(1) << "Errors detected, retrying outgoing migration"; + LOG(WARNING) << "Errors detected, retrying outgoing migration"; continue; } break; @@ -242,6 +242,7 @@ void OutgoingMigration::SyncFb() { bool OutgoingMigration::FinalizeMigration(long attempt) { // if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more // time + VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id; if (attempt > 1) { if (CheckFlowsForErrors()) { Finish(true); @@ -271,11 +272,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { auto cb = [this](util::ProactorBase* pb) { if (const auto* shard = EngineShard::tlocal(); shard) { - VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id(); slot_migrations_[shard->shard_id()]->Finalize(); } }; + VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt); @@ -309,9 +310,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { Finish(is_error); if (!is_error) { keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); - cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_id, migration_info_.slot_ranges, + cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); - VLOG(1) << "Config is updated for " << cf_->MyID(); } return true; } diff --git a/src/server/cluster/slot_set.h b/src/server/cluster/slot_set.h index aaaebe047..de54631fd 100644 --- a/src/server/cluster/slot_set.h +++ b/src/server/cluster/slot_set.h @@ -68,7 +68,7 @@ class SlotSet { } SlotRanges ToSlotRanges() const { - SlotRanges res; + std::vector res; for (SlotId i = 0; i < kSlotsNumber; ++i) { if (!slots_->test(i)) { @@ -81,7 +81,7 @@ class SlotSet { } } - return res; + return SlotRanges(res); } private: diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 526ca1471..18f04244e 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1041,7 +1041,7 @@ async def test_cluster_flushall_during_migration( df_factory.create( port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000, - vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9", + vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9,incoming_slot_migration=9", logtostdout=True, ) for i in range(2) @@ -1265,7 +1265,7 @@ async def test_cluster_fuzzymigration( df_factory.create( port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000, - vmodule="cluster_family=9,cluster_slot_migration=9", + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", ) for i in range(node_count) ] @@ -1292,9 +1292,6 @@ async def test_cluster_fuzzymigration( await client.lpush(key, i) except asyncio.exceptions.CancelledError: return - # TODO find the reason of TTL exhausted error and is it possible to fix it - except redis.exceptions.ClusterError: - return # Start ten counters counter_keys = [f"_counter{i}" for i in range(10)] @@ -1347,7 +1344,8 @@ async def test_cluster_fuzzymigration( res = True for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - logging.debug(states) + if states != "NO_STATE": + logging.debug(states) for state in states: parsed_state = re.search("([a-z]+) ([a-z0-9]+) ([A-Z]+)", state) if parsed_state == None: