1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat: introduce a multi-shard emulated cluster

This commit is contained in:
Roman Gershman 2024-05-29 17:19:37 +03:00
parent b89997c5ba
commit 19e0721ab9
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
2 changed files with 42 additions and 29 deletions

View file

@ -28,6 +28,7 @@ ABSL_FLAG(std::string, cluster_announce_ip, "", "ip that cluster commands announ
ABSL_FLAG(std::string, cluster_node_id, "",
"ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master "
"replication ID (random string)");
ABSL_FLAG(uint32_t, cluster_shard_factor, 1, "number of virtual shards per node");
ABSL_DECLARE_FLAG(int32_t, port);
@ -78,38 +79,49 @@ ClusterConfig* ClusterFamily::cluster_config() {
return tl_cluster_config.get();
}
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = kMaxSlotNum}},
.master = {},
.replicas = {},
.migrations = {}};
ClusterShardInfos ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
unsigned factor = std::max(absl::GetFlag(FLAGS_cluster_shard_factor), 1u);
unsigned slots_per_shard = (kMaxSlotNum + 1) / factor;
SlotId next_slot = 0;
ClusterShardInfos result;
optional<Replica::Info> replication_info = server_family_->GetReplicaInfo();
ServerState& etl = *ServerState::tlocal();
if (!replication_info.has_value()) {
DCHECK(etl.is_master);
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->conn()->LocalBindAddress() : cluster_announce_ip;
for (unsigned j = 0; j < factor; ++j) {
SlotId end_slot = (j + 1) == factor ? kMaxSlotNum : SlotId(next_slot + slots_per_shard - 1);
info.master = {.id = id_,
.ip = preferred_endpoint,
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))};
ClusterShardInfo info{.slot_ranges = {{.start = next_slot, .end = end_slot}},
.master = {},
.replicas = {},
.migrations = {}};
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
optional<Replica::Info> replication_info = server_family_->GetReplicaInfo();
ServerState& etl = *ServerState::tlocal();
if (!replication_info.has_value()) {
DCHECK(etl.is_master);
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->conn()->LocalBindAddress() : cluster_announce_ip;
info.master = {.id = (j == 0) ? id_ : absl::StrCat("emulated", j),
.ip = preferred_endpoint,
// +j for py-cluster to work.
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))};
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
}
} else {
// TODO: We currently don't save the master's ID in the replica
info.master = {.id = "", .ip = replication_info->host, .port = replication_info->port};
info.replicas.push_back({.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
}
} else {
// TODO: We currently don't save the master's ID in the replica
info.master = {.id = "", .ip = replication_info->host, .port = replication_info->port};
info.replicas.push_back({.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
next_slot = info.slot_ranges.front().end + 1;
result.push_back(info);
}
return info;
return result;
}
void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
@ -222,7 +234,7 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx)
void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, cntx);
return ClusterSlotsImpl(GetEmulatedShardInfo(cntx), cntx);
} else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(tl_cluster_config->GetConfig(), cntx);
} else {
@ -231,6 +243,7 @@ void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
}
namespace {
void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-nodes/

View file

@ -94,7 +94,7 @@ class ClusterFamily {
ABSL_GUARDED_BY(migration_mu_);
private:
ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;
ClusterShardInfos GetEmulatedShardInfo(ConnectionContext* cntx) const;
mutable util::fb2::Mutex config_update_mu_;