1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: Hide replica info in real cluster if --managed_service_info (#4241)

So far we only handled emulated cluster. This PR adds real cluster
support.

Related to #4173
This commit is contained in:
Shahar Mike 2024-12-02 20:22:07 +02:00 committed by GitHub
parent 935ae86c94
commit b0d633fb61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 161 additions and 4 deletions

View file

@ -65,6 +65,27 @@ constexpr string_view kClusterDisabled =
thread_local shared_ptr<ClusterConfig> tl_cluster_config; thread_local shared_ptr<ClusterConfig> tl_cluster_config;
ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
CHECK(!IsClusterEmulated());
CHECK(tl_cluster_config != nullptr);
auto config = tl_cluster_config->GetConfig();
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
return config;
}
// We can't mutate `config` so we copy it over
std::vector<ClusterShardInfo> infos;
infos.reserve(config.size());
for (auto& node : config) {
infos.push_back(node);
infos.rbegin()->replicas.clear();
}
return ClusterShardInfos{std::move(infos)};
}
} // namespace } // namespace
ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) { ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) {
@ -210,7 +231,7 @@ void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext*
if (IsClusterEmulated()) { if (IsClusterEmulated()) {
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder); return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) { } else if (tl_cluster_config != nullptr) {
return ClusterShardsImpl(tl_cluster_config->GetConfig(), builder); return ClusterShardsImpl(GetConfigForStats(cntx), builder);
} else { } else {
return builder->SendError(kClusterNotConfigured); return builder->SendError(kClusterNotConfigured);
} }
@ -255,7 +276,7 @@ void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* c
if (IsClusterEmulated()) { if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder); return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) { } else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(tl_cluster_config->GetConfig(), builder); return ClusterSlotsImpl(GetConfigForStats(cntx), builder);
} else { } else {
return builder->SendError(kClusterNotConfigured); return builder->SendError(kClusterNotConfigured);
} }
@ -311,7 +332,7 @@ void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* c
if (IsClusterEmulated()) { if (IsClusterEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder); return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder);
} else if (tl_cluster_config != nullptr) { } else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, builder); return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder);
} else { } else {
return builder->SendError(kClusterNotConfigured); return builder->SendError(kClusterNotConfigured);
} }
@ -375,7 +396,7 @@ void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cn
if (IsClusterEmulated()) { if (IsClusterEmulated()) {
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder); return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) { } else if (tl_cluster_config != nullptr) {
return ClusterInfoImpl(tl_cluster_config->GetConfig(), builder); return ClusterInfoImpl(GetConfigForStats(cntx), builder);
} else { } else {
return ClusterInfoImpl({}, builder); return ClusterInfoImpl({}, builder);
} }

View file

@ -815,6 +815,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("replication_timeout"); config_registry.RegisterMutable("replication_timeout");
config_registry.RegisterMutable("table_growth_margin"); config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("tcp_keepalive"); config_registry.RegisterMutable("tcp_keepalive");
config_registry.RegisterMutable("managed_service_info");
config_registry.RegisterMutable( config_registry.RegisterMutable(
"notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) { "notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) {

View file

@ -1,4 +1,5 @@
import pytest import pytest
import copy
import re import re
import json import json
import redis import redis
@ -376,6 +377,140 @@ async def test_emulated_cluster_with_replicas(df_factory):
} }
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_managed_service_info(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)
df_factory.start_all([master, replica])
c_master = master.client()
c_master_admin = master.admin_client()
master_id = await c_master.execute_command("CLUSTER MYID")
c_replica = replica.client()
c_replica_admin = replica.admin_client()
replica_id = await c_replica.execute_command("CLUSTER MYID")
# Connect replicas to master
rc = await c_replica_admin.execute_command(f"REPLICAOF localhost {master.port}")
assert rc == "OK"
await wait_available_async(c_replica)
nodes = [await create_node_info(master)]
nodes[0].slots = [(0, 16383)]
nodes[0].replicas = [await create_node_info(replica)]
await push_config(json.dumps(generate_config(nodes)), [master.client(), replica.client()])
expected_hidden_cluster_slots = [
[
0,
16383,
[
"127.0.0.1",
master.port,
master_id,
],
],
]
expected_full_cluster_slots = copy.deepcopy(expected_hidden_cluster_slots)
expected_full_cluster_slots[0].append(
[
"127.0.0.1",
replica.port,
replica_id,
]
)
assert await c_master.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots
assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots
expected_hidden_cluster_nodes = {
f"127.0.0.1:{master.port}": {
"connected": True,
"epoch": "0",
"flags": "myself,master",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": "-",
"migrations": [],
"node_id": master_id,
"slots": [["0", "16383"]],
},
}
expected_full_cluster_nodes = copy.deepcopy(expected_hidden_cluster_nodes)
expected_full_cluster_nodes[f"127.0.0.1:{replica.port}"] = {
"connected": True,
"epoch": "0",
"flags": "slave",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": master_id,
"migrations": [],
"node_id": replica_id,
"slots": [],
}
assert await c_master.execute_command("CLUSTER NODES") == expected_full_cluster_nodes
assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes
expected_hidden_cluster_shards = [
[
"slots",
[0, 16383],
"nodes",
[
[
"id",
master_id,
"endpoint",
"127.0.0.1",
"ip",
"127.0.0.1",
"port",
master.port,
"role",
"master",
"replication-offset",
0,
"health",
"online",
],
],
],
]
expected_full_cluster_shards = copy.deepcopy(expected_hidden_cluster_shards)
expected_full_cluster_shards[0][3].append(
[
"id",
replica_id,
"endpoint",
"127.0.0.1",
"ip",
"127.0.0.1",
"port",
replica.port,
"role",
"replica",
"replication-offset",
0,
"health",
"online",
]
)
assert await c_master.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards
assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards
await c_master.execute_command("config set managed_service_info true")
assert await c_master.execute_command("CLUSTER SLOTS") == expected_hidden_cluster_slots
assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots
assert await c_master.execute_command("CLUSTER NODES") == expected_hidden_cluster_nodes
assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes
assert await c_master.execute_command("CLUSTER SHARDS") == expected_hidden_cluster_shards
assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards
@dfly_args({"cluster_mode": "emulated"}) @dfly_args({"cluster_mode": "emulated"})
async def test_cluster_info(async_client): async def test_cluster_info(async_client):
res = await async_client.execute_command("CLUSTER INFO") res = await async_client.execute_command("CLUSTER INFO")