diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 1672bf0f7..698a49d8c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -65,6 +65,27 @@ constexpr string_view kClusterDisabled = thread_local shared_ptr tl_cluster_config; +ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) { + CHECK(!IsClusterEmulated()); + CHECK(tl_cluster_config != nullptr); + + auto config = tl_cluster_config->GetConfig(); + if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) { + return config; + } + + // We can't mutate `config` so we copy it over + std::vector infos; + infos.reserve(config.size()); + + for (auto& node : config) { + infos.push_back(node); + infos.rbegin()->replicas.clear(); + } + + return ClusterShardInfos{std::move(infos)}; +} + } // namespace ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) { @@ -210,7 +231,7 @@ void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* if (IsClusterEmulated()) { return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder); } else if (tl_cluster_config != nullptr) { - return ClusterShardsImpl(tl_cluster_config->GetConfig(), builder); + return ClusterShardsImpl(GetConfigForStats(cntx), builder); } else { return builder->SendError(kClusterNotConfigured); } @@ -255,7 +276,7 @@ void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* c if (IsClusterEmulated()) { return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder); } else if (tl_cluster_config != nullptr) { - return ClusterSlotsImpl(tl_cluster_config->GetConfig(), builder); + return ClusterSlotsImpl(GetConfigForStats(cntx), builder); } else { return builder->SendError(kClusterNotConfigured); } @@ -311,7 +332,7 @@ void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* c if (IsClusterEmulated()) { return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder); } else if (tl_cluster_config != nullptr) { - return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, builder); + return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder); } else { return builder->SendError(kClusterNotConfigured); } @@ -375,7 +396,7 @@ void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cn if (IsClusterEmulated()) { return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder); } else if (tl_cluster_config != nullptr) { - return ClusterInfoImpl(tl_cluster_config->GetConfig(), builder); + return ClusterInfoImpl(GetConfigForStats(cntx), builder); } else { return ClusterInfoImpl({}, builder); } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 938bde4bf..d1cb33853 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -815,6 +815,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("replication_timeout"); config_registry.RegisterMutable("table_growth_margin"); config_registry.RegisterMutable("tcp_keepalive"); + config_registry.RegisterMutable("managed_service_info"); config_registry.RegisterMutable( "notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 27fce9d43..879aa2917 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1,4 +1,5 @@ import pytest +import copy import re import json import redis @@ -376,6 +377,140 @@ async def test_emulated_cluster_with_replicas(df_factory): } +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_managed_service_info(df_factory): + master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100) + replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101) + + df_factory.start_all([master, replica]) + + c_master = master.client() + c_master_admin = master.admin_client() + master_id = await c_master.execute_command("CLUSTER MYID") + + c_replica = replica.client() + c_replica_admin = replica.admin_client() + replica_id = await c_replica.execute_command("CLUSTER MYID") + + # Connect replicas to master + rc = await c_replica_admin.execute_command(f"REPLICAOF localhost {master.port}") + assert rc == "OK" + await wait_available_async(c_replica) + + nodes = [await create_node_info(master)] + nodes[0].slots = [(0, 16383)] + nodes[0].replicas = [await create_node_info(replica)] + await push_config(json.dumps(generate_config(nodes)), [master.client(), replica.client()]) + + expected_hidden_cluster_slots = [ + [ + 0, + 16383, + [ + "127.0.0.1", + master.port, + master_id, + ], + ], + ] + expected_full_cluster_slots = copy.deepcopy(expected_hidden_cluster_slots) + expected_full_cluster_slots[0].append( + [ + "127.0.0.1", + replica.port, + replica_id, + ] + ) + assert await c_master.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots + assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots + + expected_hidden_cluster_nodes = { + f"127.0.0.1:{master.port}": { + "connected": True, + "epoch": "0", + "flags": "myself,master", + "last_ping_sent": "0", + "last_pong_rcvd": "0", + "master_id": "-", + "migrations": [], + "node_id": master_id, + "slots": [["0", "16383"]], + }, + } + expected_full_cluster_nodes = copy.deepcopy(expected_hidden_cluster_nodes) + expected_full_cluster_nodes[f"127.0.0.1:{replica.port}"] = { + "connected": True, + "epoch": "0", + "flags": "slave", + "last_ping_sent": "0", + "last_pong_rcvd": "0", + "master_id": master_id, + "migrations": [], + "node_id": replica_id, + "slots": [], + } + assert await c_master.execute_command("CLUSTER NODES") == expected_full_cluster_nodes + assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes + + expected_hidden_cluster_shards = [ + [ + "slots", + [0, 16383], + "nodes", + [ + [ + "id", + master_id, + "endpoint", + "127.0.0.1", + "ip", + "127.0.0.1", + "port", + master.port, + "role", + "master", + "replication-offset", + 0, + "health", + "online", + ], + ], + ], + ] + expected_full_cluster_shards = copy.deepcopy(expected_hidden_cluster_shards) + expected_full_cluster_shards[0][3].append( + [ + "id", + replica_id, + "endpoint", + "127.0.0.1", + "ip", + "127.0.0.1", + "port", + replica.port, + "role", + "replica", + "replication-offset", + 0, + "health", + "online", + ] + ) + assert await c_master.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards + assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards + + await c_master.execute_command("config set managed_service_info true") + + assert await c_master.execute_command("CLUSTER SLOTS") == expected_hidden_cluster_slots + assert await c_master_admin.execute_command("CLUSTER SLOTS") == expected_full_cluster_slots + + assert await c_master.execute_command("CLUSTER NODES") == expected_hidden_cluster_nodes + assert await c_master_admin.execute_command("CLUSTER NODES") == expected_full_cluster_nodes + + assert await c_master.execute_command("CLUSTER SHARDS") == expected_hidden_cluster_shards + assert await c_master_admin.execute_command("CLUSTER SHARDS") == expected_full_cluster_shards + + @dfly_args({"cluster_mode": "emulated"}) async def test_cluster_info(async_client): res = await async_client.execute_command("CLUSTER INFO")