1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: fix move error during migration finalization (#3253)

* fix: fix Move error during migration finalization
This commit is contained in:
Borys 2024-07-02 14:23:54 +03:00 committed by GitHub
parent 506ecbc3cd
commit 84814a7358
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 185 additions and 111 deletions

View file

@ -90,6 +90,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
shared_ptr<ClusterConfig> result(new ClusterConfig());
result->my_id_ = my_id;
result->config_ = config;
for (const auto& shard : result->config_) {
@ -101,10 +102,10 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
result->my_outgoing_migrations_ = shard.migrations;
} else {
for (const auto& m : shard.migrations) {
if (my_id == m.node_id) {
if (my_id == m.node_info.id) {
auto incoming_migration = m;
// for incoming migration we need the source node
incoming_migration.node_id = shard.master.id;
incoming_migration.node_info.id = shard.master.id;
result->my_incoming_migrations_.push_back(std::move(incoming_migration));
}
}
@ -132,7 +133,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
return nullopt;
}
SlotRanges ranges;
std::vector<SlotRange> ranges;
for (const auto& range : slots.array_range()) {
if (!range.is_object()) {
@ -149,7 +150,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
ranges.push_back({.start = start.value(), .end = end.value()});
}
return ranges;
return SlotRanges(ranges);
}
optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
@ -211,10 +212,10 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
return nullopt;
}
res.emplace_back(MigrationInfo{.slot_ranges = std::move(*slots),
.node_id = node_id.as_string(),
.ip = ip.as_string(),
.port = *port});
res.emplace_back(MigrationInfo{
.slot_ranges = std::move(*slots),
.node_info =
ClusterNodeInfo{.id = node_id.as_string(), .ip = ip.as_string(), .port = *port}});
}
return res;
}
@ -316,10 +317,17 @@ ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LE(id, cluster::kMaxSlotNum) << "Requesting a non-existing slot id " << id;
for (const auto& shard : config_) {
for (const auto& range : shard.slot_ranges) {
if (id >= range.start && id <= range.end) {
return shard.master;
if (shard.slot_ranges.Contains(id)) {
if (shard.master.id == my_id_) {
// The only reason why this function call and shard.master == my_id_ is the slot was
// migrated
for (const auto& m : shard.migrations) {
if (m.slot_ranges.Contains(id)) {
return m.node_info;
}
}
}
return shard.master;
}
}

View file

@ -9,7 +9,6 @@
#include <vector>
#include "src/server/cluster/slot_set.h"
#include "src/server/common.h"
namespace dfly::cluster {
@ -59,6 +58,7 @@ class ClusterConfig {
ClusterConfig() = default;
std::string my_id_;
ClusterShardInfos config_;
SlotSet my_slots_;

View file

@ -96,7 +96,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 16000}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}}}),
@ -105,11 +105,11 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = {{.start = 0, .end = 0}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}}),
@ -118,7 +118,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF + 1}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}}}),
@ -127,7 +127,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) {
TEST_F(ClusterConfigTest, ConfigSetOk) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}}});
@ -139,7 +139,7 @@ TEST_F(ClusterConfigTest, ConfigSetOk) {
TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 0x3FFF}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}}});
@ -150,21 +150,21 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = {{.start = 0, .end = 5'000}},
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = {{.start = 5'001, .end = 10'000}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = {{.start = 10'001, .end = 0x3FFF}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}});
EXPECT_NE(config, nullptr);
SlotSet owned_slots = config->GetOwnedSlots();
EXPECT_EQ(owned_slots.ToSlotRanges().size(), 1);
EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1);
EXPECT_EQ(owned_slots.Count(), 5'000);
{
@ -481,8 +481,8 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) {
auto config1 = ClusterConfig::CreateFromConfig("id0", config_str);
EXPECT_EQ(
config1->GetNewOutgoingMigrations(nullptr),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .node_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
(std::vector<MigrationInfo>{{.slot_ranges = SlotRanges({{7000, 8000}}),
.node_info = {.id = "id1", .ip = "127.0.0.1", .port = 9001}}}));
EXPECT_TRUE(config1->GetFinishedOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config1->GetNewIncomingMigrations(nullptr).empty());
@ -491,8 +491,8 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) {
auto config2 = ClusterConfig::CreateFromConfig("id1", config_str);
EXPECT_EQ(
config2->GetNewIncomingMigrations(nullptr),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .node_id = "id0", .ip = "127.0.0.1", .port = 9001}}));
(std::vector<MigrationInfo>{{.slot_ranges = SlotRanges({{7000, 8000}}),
.node_info = {.id = "id0", .ip = "127.0.0.1", .port = 9001}}}));
EXPECT_TRUE(config2->GetFinishedOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config2->GetNewOutgoingMigrations(nullptr).empty());
@ -523,16 +523,16 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) {
EXPECT_EQ(
config4->GetFinishedOutgoingMigrations(config1),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .node_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
(std::vector<MigrationInfo>{{.slot_ranges = SlotRanges({{7000, 8000}}),
.node_info = {.id = "id1", .ip = "127.0.0.1", .port = 9001}}}));
EXPECT_TRUE(config4->GetNewIncomingMigrations(config1).empty());
EXPECT_TRUE(config4->GetFinishedIncomingMigrations(config1).empty());
EXPECT_TRUE(config4->GetNewOutgoingMigrations(config1).empty());
EXPECT_EQ(
config5->GetFinishedIncomingMigrations(config2),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .node_id = "id0", .ip = "127.0.0.1", .port = 9001}}));
(std::vector<MigrationInfo>{{.slot_ranges = SlotRanges({{7000, 8000}}),
.node_info = {.id = "id0", .ip = "127.0.0.1", .port = 9001}}}));
EXPECT_TRUE(config5->GetNewIncomingMigrations(config2).empty());
EXPECT_TRUE(config5->GetFinishedOutgoingMigrations(config2).empty());
EXPECT_TRUE(config5->GetNewOutgoingMigrations(config2).empty());
@ -589,7 +589,7 @@ TEST_F(ClusterConfigTest, SlotSetAPI) {
ss.Set(5010, true);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5010, 5010}}));
ss.Set({SlotRange{5000, 5100}}, true);
ss.Set(SlotRanges({{5000, 5100}}), true);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5100}}));
ss.Set(5050, false);
@ -598,7 +598,7 @@ TEST_F(ClusterConfigTest, SlotSetAPI) {
ss.Set(5500, false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}}));
ss.Set({SlotRange{5090, 5100}}, false);
ss.Set(SlotRanges({{5090, 5100}}), false);
EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5089}}));
SlotSet ss1(SlotRanges({{1001, 2000}}));

View file

@ -3,9 +3,13 @@ extern "C" {
#include "redis/crc16.h"
}
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_defs.h"
#include "slot_set.h"
#include "src/server/common.h"
using namespace std;
@ -15,6 +19,32 @@ ABSL_FLAG(string, cluster_mode, "",
"'emulated', 'yes' or ''");
namespace dfly::cluster {
std::string SlotRange::ToString() const {
return absl::StrCat("[", start, ", ", end, "]");
}
SlotRanges::SlotRanges(std::vector<SlotRange> ranges) : ranges_(std::move(ranges)) {
std::sort(ranges_.begin(), ranges_.end());
}
void SlotRanges::Merge(const SlotRanges& sr) {
// TODO rewrite it
SlotSet slots(*this);
slots.Set(sr, true);
ranges_ = std::move(slots.ToSlotRanges().ranges_);
}
std::string SlotRanges::ToString() const {
return absl::StrJoin(ranges_, ", ", [](std::string* out, SlotRange range) {
absl::StrAppend(out, range.ToString());
});
}
std::string MigrationInfo::ToString() const {
return absl::StrCat(node_info.id, ",", node_info.ip, ":", node_info.port, " (",
slot_ranges.ToString(), ")");
}
namespace {
enum class ClusterMode {
kUninitialized,

View file

@ -4,10 +4,7 @@
#pragma once
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include <memory>
#include <cstdint>
#include <string>
#include <string_view>
#include <vector>
@ -24,45 +21,85 @@ struct SlotRange {
SlotId start = 0;
SlotId end = 0;
bool operator==(const SlotRange& r) const {
bool operator==(const SlotRange& r) const noexcept {
return start == r.start && end == r.end;
}
bool IsValid() {
bool operator<(const SlotRange& r) const noexcept {
return start < r.start || (start == r.start && end < r.end);
}
bool IsValid() const noexcept {
return start <= end && start <= kMaxSlotId && end <= kMaxSlotId;
}
std::string ToString() const {
return absl::StrCat("[", start, ", ", end, "]");
bool Contains(SlotId id) const noexcept {
return id >= start && id <= end;
}
static std::string ToString(const std::vector<SlotRange>& ranges) {
return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) {
absl::StrAppend(out, range.ToString());
});
}
std::string ToString() const;
};
using SlotRanges = std::vector<SlotRange>;
class SlotRanges {
public:
SlotRanges() = default;
explicit SlotRanges(std::vector<SlotRange> ranges);
bool Contains(SlotId id) const noexcept {
for (const auto& sr : ranges_) {
if (sr.Contains(id))
return true;
}
return false;
}
size_t Size() const noexcept {
return ranges_.size();
}
bool Empty() const noexcept {
return ranges_.empty();
}
void Merge(const SlotRanges& sr);
bool operator==(const SlotRanges& r) const noexcept {
return ranges_ == r.ranges_;
}
std::string ToString() const;
auto begin() const noexcept {
return ranges_.cbegin();
}
auto end() const noexcept {
return ranges_.cend();
}
private:
std::vector<SlotRange> ranges_;
};
struct ClusterNodeInfo {
std::string id;
std::string ip;
uint16_t port = 0;
bool operator==(const ClusterNodeInfo& r) const noexcept {
return port == r.port && ip == r.ip && id == r.id;
}
};
struct MigrationInfo {
std::vector<SlotRange> slot_ranges;
std::string node_id;
std::string ip;
uint16_t port = 0;
SlotRanges slot_ranges;
ClusterNodeInfo node_info;
bool operator==(const MigrationInfo& r) const {
return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && node_id == r.node_id;
bool operator==(const MigrationInfo& r) const noexcept {
return node_info == r.node_info && slot_ranges == r.slot_ranges;
}
std::string ToString() const {
return absl::StrCat(node_id, ",", ip, ":", port, " (", SlotRange::ToString(slot_ranges), ")");
}
std::string ToString() const;
};
struct ClusterShardInfo {

View file

@ -79,7 +79,7 @@ ClusterConfig* ClusterFamily::cluster_config() {
}
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = kMaxSlotNum}},
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
.master = {},
.replicas = {},
.migrations = {}};
@ -160,7 +160,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx)
rb->StartArray(kEntrySize);
rb->SendBulkString("slots");
rb->StartArray(shard.slot_ranges.size() * 2);
rb->StartArray(shard.slot_ranges.Size() * 2);
for (const auto& slot_range : shard.slot_ranges) {
rb->SendLong(slot_range.start);
rb->SendLong(slot_range.end);
@ -200,7 +200,7 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx)
unsigned int slot_ranges = 0;
for (const auto& shard : config) {
slot_ranges += shard.slot_ranges.size();
slot_ranges += shard.slot_ranges.Size();
}
rb->StartArray(slot_ranges);
@ -237,7 +237,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, Connec
string result;
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id,
const vector<SlotRange>& ranges) {
const SlotRanges& ranges) {
absl::StrAppend(&result, node.id, " ");
absl::StrAppend(&result, node.ip, ":", node.port, "@", node.port, " ");
@ -312,7 +312,7 @@ void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
known_nodes += 1; // For master
known_nodes += shard_config.replicas.size();
if (!shard_config.slot_ranges.empty()) {
if (!shard_config.slot_ranges.Empty()) {
++cluster_size;
}
}
@ -436,7 +436,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
namespace {
void DeleteSlots(const SlotRanges& slots_ranges) {
if (slots_ranges.empty()) {
if (slots_ranges.Empty()) {
return;
}
@ -451,13 +451,13 @@ void DeleteSlots(const SlotRanges& slots_ranges) {
}
void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) {
if (slot_ranges.empty()) {
if (slot_ranges.Empty()) {
return;
}
// Build args
vector<string> args;
args.reserve(slot_ranges.size() + 1);
args.reserve(slot_ranges.Size() + 1);
args.push_back("FLUSHSLOTS");
for (SlotRange range : slot_ranges) {
args.push_back(absl::StrCat(range.start));
@ -517,14 +517,12 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
// set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem
for (const auto& m : incoming_migrations_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
const auto& slots = m->GetSlots();
enable_slots.insert(enable_slots.end(), slots.begin(), slots.end());
enable_slots.Merge(m->GetSlots());
}
}
for (const auto& m : outgoing_migration_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
const auto& slots = m->GetSlots();
disable_slots.insert(disable_slots.end(), slots.begin(), slots.end());
disable_slots.Merge(m->GetSlots());
}
}
}
@ -554,17 +552,16 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
DCHECK(tl_cluster_config != nullptr);
if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed out";
LOG(WARNING) << "Cluster config change timed for: " << MyID();
}
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.insert(deleted_slots.end(), out_migrations_slots.begin(),
out_migrations_slots.end());
LOG_IF(INFO, !deleted_slots.empty())
<< "Flushing newly unowned slots: " << SlotRange::ToString(deleted_slots);
deleted_slots.Merge(out_migrations_slots);
DeleteSlots(deleted_slots);
LOG_IF(INFO, !deleted_slots.Empty())
<< "Flushing newly unowned slots: " << deleted_slots.ToString();
WriteFlushSlotsToJournal(deleted_slots);
}
@ -619,7 +616,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
}
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) {
SlotRanges slot_ranges;
std::vector<SlotRange> slot_ranges;
CmdArgParser parser(args);
do {
@ -630,7 +627,7 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
DeleteSlots(slot_ranges);
DeleteSlots(SlotRanges(std::move(slot_ranges)));
return cntx->SendOk();
}
@ -692,8 +689,8 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
m->GetErrorStr());
}
for (const auto& m : outgoing_migration_jobs_) {
append_answer("out", m->GetMigrationInfo().node_id, node_id, m->GetState(), m->GetKeyCount(),
m->GetErrorStr());
append_answer("out", m->GetMigrationInfo().node_info.id, node_id, m->GetState(),
m->GetKeyCount(), m->GetErrorStr());
}
if (reply.empty()) {
@ -741,14 +738,14 @@ SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
[&m](const auto& om) {
// we can have only one migration per target-source pair
return m.node_id == om->GetMigrationInfo().node_id;
return m.node_info.id == om->GetMigrationInfo().node_info.id;
});
DCHECK(it != outgoing_migration_jobs_.end());
DCHECK(it->get() != nullptr);
OutgoingMigration& migration = *it->get();
const auto& slots = migration.GetSlots();
removed_slots.insert(removed_slots.end(), slots.begin(), slots.end());
LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(slots) << " to "
removed_slots.Merge(slots);
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
<< migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish();
outgoing_migration_jobs_.erase(it);
@ -789,7 +786,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
auto removed_ranges = removed.ToSlotRanges();
LOG_IF(WARNING, migration->GetState() == MigrationState::C_FINISHED)
<< "Flushing slots of removed FINISHED migration " << migration->GetSourceID()
<< ", slots: " << SlotRange::ToString(removed_ranges);
<< ", slots: " << removed_ranges.ToString();
DeleteSlots(removed_ranges);
}
@ -800,8 +797,8 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) {
lock_guard lk(migration_mu_);
for (const auto& m : migrations) {
RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_id);
VLOG(1) << "Migration was canceled from: " << m.node_id;
RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_info.id);
VLOG(1) << "Migration was canceled from: " << m.node_info.id;
}
}
@ -811,7 +808,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
auto [source_id, flows_num] = parser.Next<std::string, uint32_t>();
SlotRanges slots;
std::vector<SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
@ -824,7 +821,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
[&](const MigrationInfo& info) {
// TODO: also compare slot ranges (in an order-agnostic way)
return info.node_id == source_id;
return info.node_info.id == source_id;
});
if (!found) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
@ -838,7 +835,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
std::move(source_id), &server_family_->service(), std::move(slots), flows_num));
std::move(source_id), &server_family_->service(), SlotRanges(std::move(slots)), flows_num));
return cntx->SendOk();
}
@ -881,8 +878,11 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
const SlotRanges& slots, bool is_incoming) {
VLOG(1) << "Update config for slots ranges: " << slots.ToString() << " for " << MyID() << " : "
<< node_id;
lock_guard gu(set_config_mu);
lock_guard lk(migration_mu_);
bool is_migration_valid = false;
if (is_incoming) {
for (const auto& mj : incoming_migrations_jobs_) {
@ -893,14 +893,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
}
} else {
for (const auto& mj : outgoing_migration_jobs_) {
if (mj->GetMigrationInfo().node_id == node_id) {
if (mj->GetMigrationInfo().node_info.id == node_id) {
// TODO add compare for slots
is_migration_valid = true;
}
}
}
if (!is_migration_valid)
if (!is_migration_valid) {
LOG(WARNING) << "Config wasn't updated for slots ranges: " << slots.ToString() << " for "
<< MyID() << " : " << node_id;
return;
}
auto new_config = is_incoming ? tl_cluster_config->CloneWithChanges(slots, {})
: tl_cluster_config->CloneWithChanges({}, slots);
@ -910,6 +913,8 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
server_family_->service().proactor_pool().AwaitFiberOnAll(
[&new_config](util::ProactorBase*) { tl_cluster_config = new_config; });
DCHECK(tl_cluster_config != nullptr);
VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID()
<< " : " << node_id;
}
void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
@ -923,7 +928,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "DFLYMIGRATE ACK" << args;
auto in_migrations = tl_cluster_config->GetIncomingMigrations();
auto m_it = std::find_if(in_migrations.begin(), in_migrations.end(),
[source_id](const auto& m) { return m.node_id == source_id; });
[source_id](const auto& m) { return m.node_info.id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
return cntx->SendLong(OutgoingMigration::kInvalidAttempt);
@ -937,10 +942,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError("Join timeout happened");
}
VLOG(1) << "Migration is joined for " << source_id;
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);
VLOG(1) << "Config is updated for " << MyID();
return cntx->SendLong(attempt);
}

View file

@ -144,7 +144,7 @@ bool IncomingSlotMigration::Join() {
void IncomingSlotMigration::Stop() {
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
LOG(INFO) << log_state << " incoming migration of slots " << SlotRange::ToString(slots_);
LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString();
cntx_.Cancel();
for (auto& flow : shard_flows_) {
@ -156,11 +156,10 @@ void IncomingSlotMigration::Stop() {
}
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
VLOG(1) << "Start flow for shard: " << shard;
state_.store(MigrationState::C_SYNC);
shard_flows_[shard]->Start(&cntx_, source, bc_);
VLOG(1) << "Incoming slot migration flow for shard: " << shard << " finished";
VLOG(1) << "Incoming flow: " << shard << " finished for " << source_id_;
}
size_t IncomingSlotMigration::GetKeyCount() const {

View file

@ -78,7 +78,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
};
OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf)
: ProtocolClient(info.ip, info.port),
: ProtocolClient(info.node_info.ip, info.node_info.port),
migration_info_(std::move(info)),
slot_migrations_(shard_set->size()),
server_family_(sf),
@ -107,6 +107,8 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) {
}
void OutgoingMigration::Finish(bool is_error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
<< migration_info_.node_info.id;
bool should_cancel_flows = false;
{
@ -217,20 +219,18 @@ void OutgoingMigration::SyncFb() {
});
if (CheckFlowsForErrors()) {
VLOG(1) << "Errors detected, retrying outgoing migration";
LOG(WARNING) << "Errors detected, retrying outgoing migration";
continue;
}
VLOG(2) << "Migrations snapshot is finished";
long attempt = 0;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
VLOG(2) << "Waiting for migration to finalize...";
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
if (CheckFlowsForErrors()) {
VLOG(1) << "Errors detected, retrying outgoing migration";
LOG(WARNING) << "Errors detected, retrying outgoing migration";
continue;
}
break;
@ -242,6 +242,7 @@ void OutgoingMigration::SyncFb() {
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more
// time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
if (attempt > 1) {
if (CheckFlowsForErrors()) {
Finish(true);
@ -271,11 +272,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
auto cb = [this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id();
slot_migrations_[shard->shard_id()]->Finalize();
}
};
VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
@ -309,9 +310,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
Finish(is_error);
if (!is_error) {
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_id, migration_info_.slot_ranges,
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false);
VLOG(1) << "Config is updated for " << cf_->MyID();
}
return true;
}

View file

@ -68,7 +68,7 @@ class SlotSet {
}
SlotRanges ToSlotRanges() const {
SlotRanges res;
std::vector<SlotRange> res;
for (SlotId i = 0; i < kSlotsNumber; ++i) {
if (!slots_->test(i)) {
@ -81,7 +81,7 @@ class SlotSet {
}
}
return res;
return SlotRanges(res);
}
private:

View file

@ -1041,7 +1041,7 @@ async def test_cluster_flushall_during_migration(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9",
vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9,incoming_slot_migration=9",
logtostdout=True,
)
for i in range(2)
@ -1265,7 +1265,7 @@ async def test_cluster_fuzzymigration(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9",
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)
for i in range(node_count)
]
@ -1292,9 +1292,6 @@ async def test_cluster_fuzzymigration(
await client.lpush(key, i)
except asyncio.exceptions.CancelledError:
return
# TODO find the reason of TTL exhausted error and is it possible to fix it
except redis.exceptions.ClusterError:
return
# Start ten counters
counter_keys = [f"_counter{i}" for i in range(10)]
@ -1347,7 +1344,8 @@ async def test_cluster_fuzzymigration(
res = True
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
logging.debug(states)
if states != "NO_STATE":
logging.debug(states)
for state in states:
parsed_state = re.search("([a-z]+) ([a-z0-9]+) ([A-Z]+)", state)
if parsed_state == None: