1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

refactor: remove start-slot-migration cmd #2727 (#2728)

* refactor: remove start-slot-migration cmd #2727
This commit is contained in:
Borys 2024-03-21 13:37:05 +02:00 committed by GitHub
parent 31fabf2182
commit d6b7df94bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 161 additions and 158 deletions

View file

@ -408,8 +408,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterMyId(args, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "START-SLOT-MIGRATION") {
return DflyClusterStartSlotMigration(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflyClusterSlotMigrationStatus(args, cntx);
}
@ -526,7 +524,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
DCHECK(tl_cluster_config != nullptr);
// TODO rewrite with outgoing migrations
if (!StartSlotMigrations(new_config->GetIncomingMigrations(), cntx)) {
if (!StartSlotMigrations(new_config->GetOutgoingMigrations(), cntx)) {
return cntx->SendError("Can't start the migration");
}
RemoveFinishedMigrations();
@ -607,36 +605,12 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
return cntx->SendOk();
}
void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser(args);
auto [host_ip, port] = parser.Next<std::string_view, uint16_t>();
std::vector<SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
auto* node = AddMigration(std::string(host_ip), port, std::move(slots));
if (!node) {
return cntx->SendError("Can't start the migration, another one is in progress");
}
node->Start(cntx);
return cntx->SendLong(node->GetSyncId());
}
bool ClusterFamily::StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations,
ConnectionContext* cntx) {
// Add validating and error processing
for (auto m : migrations) {
auto* node = AddMigration(m.ip, m.port, m.slot_ranges);
if (!node) {
return false;
}
node->Start(cntx);
auto outgoing_migration = CreateOutgoingMigration(m.ip, m.port, m.slot_ranges);
outgoing_migration->Start(cntx);
}
return true;
}
@ -704,8 +678,8 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
args.remove_prefix(1);
if (sub_cmd == "CONF") {
MigrationConf(args, cntx);
if (sub_cmd == "INIT") {
InitMigration(args, cntx);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, cntx);
} else if (sub_cmd == "ACK") {
@ -715,8 +689,8 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
}
}
ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t port,
SlotRanges slots) {
ClusterSlotMigration* ClusterFamily::CreateIncomingMigration(std::string host_ip, uint16_t port,
SlotRanges slots) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) {
@ -745,10 +719,10 @@ void ClusterFamily::RemoveFinishedMigrations() {
}
}
void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config " << args;
void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create incoming migration, args: " << args;
CmdArgParser parser{args};
auto port = parser.Next<uint16_t>();
auto [sync_id, port, flows_num] = parser.Next<uint32_t, uint32_t, uint32_t>();
SlotRanges slots;
do {
@ -759,32 +733,18 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
if (!tl_cluster_config) {
return cntx->SendError(kClusterNotConfigured);
}
auto* node =
CreateIncomingMigration(cntx->conn()->RemoteEndpointAddress(), port, std::move(slots));
for (const auto& migration_range : slots) {
for (auto i = migration_range.start; i <= migration_range.end; ++i) {
if (!tl_cluster_config->IsMySlot(i)) {
VLOG(1) << "Invalid migration slot " << i << " in range " << migration_range.start << ':'
<< migration_range.end;
return cntx->SendError("Invalid slots range");
}
}
}
VLOG(1) << "Init migration " << cntx->conn()->RemoteEndpointAddress() << ":" << port;
node->Init(sync_id, flows_num);
auto sync_id = CreateOutgoingMigration(cntx, port, std::move(slots));
cntx->conn()->SetName("slot_migration_ctrl");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(2);
rb->SendLong(sync_id);
rb->SendLong(shard_set->size());
return;
return cntx->SendOk();
}
uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
SlotRanges slots) {
std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(std::string host,
uint16_t port,
SlotRanges slots) {
std::lock_guard lk(migration_mu_);
auto sync_id = next_sync_id_++;
auto err_handler = [](const GenericError& err) {
@ -793,11 +753,11 @@ uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_
// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto migration = make_shared<OutgoingMigration>(cntx->conn()->RemoteEndpointAddress(), port,
std::move(slots), err_handler, server_family_);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, std::move(migration));
auto migration = make_shared<OutgoingMigration>(host, port, std::move(slots), sync_id,
err_handler, server_family_);
auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, migration);
CHECK(inserted);
return sync_id;
return migration;
}
void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
@ -824,7 +784,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
EngineShard* shard = EngineShard::tlocal();
DCHECK(shard->shard_id() == shard_id);
migration->StartFlow(sync_id, server_family_->journal(), cntx->conn()->socket());
migration->StartFlow(server_family_->journal(), cntx->conn()->socket());
}
void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {

View file

@ -52,17 +52,13 @@ class ClusterFamily {
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
private: // Slots migration section
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE CONF initiate first step in slots migration procedure
// MigrationConf process this request and saving slots range and
// target node port in outgoing_migration_jobs_.
// return sync_id and shard number to the target node
void MigrationConf(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE INIT is internal command to create incoming migration object
void InitMigration(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE FLOW initiate second step in slots migration procedure
// this request should be done for every shard on the target node
@ -73,14 +69,16 @@ class ClusterFamily {
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);
// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, SlotRanges slots);
ClusterSlotMigration* CreateIncomingMigration(std::string host_ip, uint16_t port,
SlotRanges slots);
bool StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations,
ConnectionContext* cntx);
void RemoveFinishedMigrations();
// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, SlotRanges slots);
std::shared_ptr<OutgoingMigration> CreateOutgoingMigration(std::string host_ip, uint16_t port,
SlotRanges slots);
std::shared_ptr<OutgoingMigration> GetOutgoingMigration(uint32_t sync_id);

View file

@ -53,56 +53,29 @@ ClusterSlotMigration::~ClusterSlotMigration() {
sync_fb_.JoinIfNeeded();
}
error_code ClusterSlotMigration::Start(ConnectionContext* cntx) {
VLOG(1) << "Starting slot migration";
auto check_connection_error = [&cntx](error_code ec, const char* msg) -> error_code {
if (ec) {
cntx->SendError(absl::StrCat(msg, ec.message()));
}
return ec;
};
VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));
VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
error_code ClusterSlotMigration::Init(uint32_t sync_id, uint32_t shards_num) {
VLOG(1) << "Init slot migration";
state_ = MigrationState::C_CONNECTING;
VLOG(1) << "Greeting";
ec = Greet();
RETURN_ON_ERR(check_connection_error(ec, "couldn't greet source "));
sync_id_ = sync_id;
source_shards_num_ = shards_num;
VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
if (ec)
return ec;
VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
if (ec)
return ec;
ResetParser(false);
sync_fb_ = fb2::Fiber("main_migration", &ClusterSlotMigration::MainMigrationFb, this);
return {};
}
error_code ClusterSlotMigration::Greet() {
ResetParser(false);
VLOG(1) << "greeting message handling";
RETURN_ON_ERR(SendCommandAndReadResponse("PING"));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG"));
auto port = absl::GetFlag(FLAGS_port);
auto cmd = absl::StrCat("DFLYMIGRATE CONF ", port);
for (const auto& s : slots_) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}
VLOG(1) << "Migration command: " << cmd;
RETURN_ON_ERR(SendCommandAndReadResponse(cmd));
// Response is: sync_id, num_shards
if (!CheckRespFirstTypes({RespExpr::INT64, RespExpr::INT64}))
return make_error_code(errc::bad_message);
sync_id_ = get<int64_t>(LastResponseArgs()[0].u);
source_shards_num_ = get<int64_t>(LastResponseArgs()[1].u);
return error_code{};
return ec;
}
ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const {

View file

@ -26,7 +26,9 @@ class ClusterSlotMigration : private ProtocolClient {
~ClusterSlotMigration();
// Initiate connection with source node and create migration fiber
std::error_code Start(ConnectionContext* cntx);
// will be refactored in the future
std::error_code Init(uint32_t sync_id, uint32_t shards_num);
Info GetInfo() const;
uint32_t GetSyncId() const {
return sync_id_;
@ -49,8 +51,6 @@ class ClusterSlotMigration : private ProtocolClient {
}
private:
// Send DFLYMIGRATE CONF to the source and get info about migration process
std::error_code Greet();
void MainMigrationFb();
// Creates flows, one per shard on the source node and manage migration process
std::error_code InitiateSlotsMigration();

View file

@ -4,15 +4,22 @@
#include "server/cluster/outgoing_slot_migration.h"
#include <absl/flags/flag.h>
#include <atomic>
#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/streamer.h"
#include "server/server_family.h"
ABSL_DECLARE_FLAG(int, source_connect_timeout_ms);
ABSL_DECLARE_FLAG(uint16_t, admin_port);
using namespace std;
using namespace util;
@ -52,9 +59,12 @@ class OutgoingMigration::SliceSlotMigration {
};
OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots,
Context::ErrHandler err_handler, ServerFamily* sf)
: host_ip_(ip),
uint32_t sync_id, Context::ErrHandler err_handler,
ServerFamily* sf)
: ProtocolClient(ip, port),
host_ip_(ip),
port_(port),
sync_id_(sync_id),
slots_(slots),
cntx_(err_handler),
slot_migrations_(shard_set->size()),
@ -65,7 +75,7 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}
void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest) {
void OutgoingMigration::StartFlow(journal::Journal* journal, io::Sink* dest) {
EngineShard* shard = EngineShard::tlocal();
DbSlice* slice = &shard->db_slice();
@ -75,7 +85,7 @@ void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, i
{
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id_, journal, &cntx_, dest);
state = GetStateImpl();
}
@ -143,4 +153,35 @@ void OutgoingMigration::SyncFb() {
// TODO add ACK here and config update
}
std::error_code OutgoingMigration::Start(ConnectionContext* cntx) {
VLOG(1) << "Starting outgoing migration";
auto check_connection_error = [&cntx](error_code ec, const char* msg) -> error_code {
if (ec) {
cntx->SendError(absl::StrCat(msg, ec.message()));
}
return ec;
};
VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));
VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
VLOG(1) << "Migration initiating";
ResetParser(false);
auto port = absl::GetFlag(FLAGS_admin_port);
auto cmd = absl::StrCat("DFLYMIGRATE INIT ", sync_id_, " ", port, " ", slot_migrations_.size());
for (const auto& s : slots_) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}
RETURN_ON_ERR(SendCommandAndReadResponse(cmd));
LOG_IF(ERROR, !CheckRespIsSimpleReply("OK")) << facade::ToSV(LastResponseArgs().front().GetBuf());
return {};
}
} // namespace dfly

View file

@ -6,6 +6,7 @@
#include "io/io.h"
#include "server/cluster/cluster_config.h"
#include "server/common.h"
#include "server/protocol_client.h"
namespace dfly {
@ -17,15 +18,18 @@ class DbSlice;
class ServerFamily;
// Whole outgoing slots migration manager
class OutgoingMigration {
class OutgoingMigration : private ProtocolClient {
public:
OutgoingMigration() = default;
~OutgoingMigration();
OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, Context::ErrHandler,
ServerFamily* sf);
OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, uint32_t sync_id,
Context::ErrHandler, ServerFamily* sf);
// start migration process, sends INIT command to the target node
std::error_code Start(ConnectionContext* cntx);
// should be run for all shards
void StartFlow(uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
void StartFlow(journal::Journal* journal, io::Sink* dest);
void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);
@ -54,6 +58,7 @@ class OutgoingMigration {
private:
std::string host_ip_;
uint16_t port_;
uint32_t sync_id_;
SlotRanges slots_;
Context cntx_;
mutable util::fb2::Mutex flows_mu_;

View file

@ -886,7 +886,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": [],
"migrations": [{{ "slot_ranges": [ {{ "start": 5200, "end": 5259 }} ]
, "ip": "127.0.0.1", "port" : {nodes[0].admin_port}, "target_id": "{node_ids[1]}" }}]
, "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "target_id": "{node_ids[1]}" }}]
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
@ -901,24 +901,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
await asyncio.sleep(0.5)
try:
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER",
"START-SLOT-MIGRATION",
"127.0.0.1",
str(nodes[0].admin_port),
"5000",
"5200",
)
assert False, "Should not be able to start slot migration"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "Can't start the migration, another one is in progress"
await push_config(
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
c_nodes_admin,
)
# TODO add a check for correct results after the same config apply
await close_clients(*c_nodes, *c_nodes_admin)
@ -981,10 +964,27 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[0].execute_command("DBSIZE") == 10
res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000"
migation_config = f"""
[
{{
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": [],
"migrations": [{{ "slot_ranges": [ {{ "start": 3000, "end": 9000 }} ]
, "ip": "127.0.0.1", "port" : {nodes[1].admin_port}, "target_id": "{node_ids[1]}" }}]
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
"replicas": []
}}
]
"""
await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"),
c_nodes_admin,
)
assert 1 == res
await asyncio.sleep(0.5)
@ -1026,6 +1026,14 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
await close_clients(*c_nodes, *c_nodes_admin)
@dataclass
class MigrationInfo:
ip: str
port: int
slots: list
target_id: str
@dataclass
class NodeInfo:
instance: DflyInstance
@ -1033,7 +1041,8 @@ class NodeInfo:
admin_client: aioredis.Redis
slots: list
next_slots: list
sync_ids: list
migrations: list
id: str
@pytest.mark.skip(reason="Failing on github regression action")
@ -1069,7 +1078,8 @@ async def test_cluster_fuzzymigration(
admin_client=instance.admin_client(),
slots=[],
next_slots=[],
sync_ids=[],
migrations=[],
id=await get_node_id(instance.admin_client()),
)
for instance in instances
]
@ -1079,11 +1089,20 @@ async def test_cluster_fuzzymigration(
{
"slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots],
"master": {
"id": await get_node_id(node.admin_client),
"id": node.id,
"ip": "127.0.0.1",
"port": node.instance.port,
},
"replicas": [],
"migrations": [
{
"slot_ranges": [{"start": s, "end": e} for (s, e) in m.slots],
"target_id": m.target_id,
"ip": m.ip,
"port": m.port,
}
for m in node.migrations
],
}
for node in nodes
]
@ -1146,20 +1165,22 @@ async def test_cluster_fuzzymigration(
continue
print(node_idx, "migrates to", dest_idx, "slots", dest_slots)
sync_id = await nodes[dest_idx].admin_client.execute_command(
"DFLYCLUSTER",
"START-SLOT-MIGRATION",
"127.0.0.1",
node.instance.admin_port,
*itertools.chain(*dest_slots),
node.migrations.append(
MigrationInfo(
ip="127.0.0.1",
port=nodes[dest_idx].instance.admin_port,
slots=dest_slots,
target_id=nodes[dest_idx].id,
)
)
nodes[node_idx].sync_ids.append(sync_id)
nodes[dest_idx].next_slots.extend(dest_slots)
keeping = node.slots[num_outgoing:]
node.next_slots.extend(keeping)
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes])
iterations = 0
while True:
for node in nodes:
@ -1179,6 +1200,11 @@ async def test_cluster_fuzzymigration(
for counter in counters:
counter.cancel()
# clean migrations
for node in nodes:
node.migrations = []
# TODO this config should be pushed with new slots
# Push new config
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes])