mirror of
https://github.com/prometheus-operator/prometheus-operator.git
synced 2025-04-21 03:38:43 +00:00
Merge pull request #6121 from simonpasquier/split-endpoints-controller
chore: create kubelet endpoints controller
This commit is contained in:
commit
1d0006317b
6 changed files with 401 additions and 304 deletions
cmd/operator
pkg
|
@ -43,6 +43,7 @@ import (
|
|||
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring"
|
||||
monitoringv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/kubelet"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
|
||||
prometheusagentcontroller "github.com/prometheus-operator/prometheus-operator/pkg/prometheus/agent"
|
||||
prometheuscontroller "github.com/prometheus-operator/prometheus-operator/pkg/prometheus/server"
|
||||
|
@ -102,6 +103,10 @@ var (
|
|||
tlsClientConfig rest.TLSClientConfig
|
||||
|
||||
serverConfig = server.DefaultConfig(":8080", false)
|
||||
|
||||
// Parameters for the kubelet endpoints controller.
|
||||
kubeletObject string
|
||||
kubeletSelector operator.LabelSelector
|
||||
)
|
||||
|
||||
func parseFlags(fs *flag.FlagSet) {
|
||||
|
@ -116,8 +121,8 @@ func parseFlags(fs *flag.FlagSet) {
|
|||
fs.StringVar(&tlsClientConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
|
||||
fs.BoolVar(&tlsClientConfig.Insecure, "tls-insecure", false, "- NOT RECOMMENDED FOR PRODUCTION - Don't verify API server's CA certificate.")
|
||||
|
||||
fs.StringVar(&cfg.KubeletObject, "kubelet-service", "", "Service/Endpoints object to write kubelets into in format \"namespace/name\"")
|
||||
fs.Var(&cfg.KubeletSelector, "kubelet-selector", "Label selector to filter nodes.")
|
||||
fs.StringVar(&kubeletObject, "kubelet-service", "", "Service/Endpoints object to write kubelets into in format \"namespace/name\"")
|
||||
fs.Var(&kubeletSelector, "kubelet-selector", "Label selector to filter nodes.")
|
||||
|
||||
// The Prometheus config reloader image is released along with the
|
||||
// Prometheus Operator image, tagged with the same semver version. Default to
|
||||
|
@ -313,6 +318,23 @@ func run(fs *flag.FlagSet) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
var kec *kubelet.Controller
|
||||
if kubeletObject != "" {
|
||||
if kec, err = kubelet.New(
|
||||
log.With(logger, "component", "kubelet_endpoints"),
|
||||
restConfig,
|
||||
r,
|
||||
kubeletObject,
|
||||
kubeletSelector,
|
||||
cfg.Annotations,
|
||||
cfg.Labels,
|
||||
); err != nil {
|
||||
level.Error(logger).Log("msg", "instantiating kubelet endpoints controller failed", "err", err)
|
||||
cancel()
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the web server.
|
||||
mux := http.NewServeMux()
|
||||
|
||||
|
@ -352,6 +374,9 @@ func run(fs *flag.FlagSet) int {
|
|||
}
|
||||
wg.Go(func() error { return ao.Run(ctx) })
|
||||
wg.Go(func() error { return to.Run(ctx) })
|
||||
if kec != nil {
|
||||
wg.Go(func() error { return kec.Run(ctx) })
|
||||
}
|
||||
|
||||
term := make(chan os.Signal, 1)
|
||||
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
|
||||
|
|
269
pkg/kubelet/controller.go
Normal file
269
pkg/kubelet/controller.go
Normal file
|
@ -0,0 +1,269 @@
|
|||
// Copyright 2023 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
|
||||
)
|
||||
|
||||
const resyncPeriod = 3 * time.Minute
|
||||
|
||||
type Controller struct {
|
||||
logger log.Logger
|
||||
|
||||
kclient kubernetes.Interface
|
||||
|
||||
nodeAddressLookupErrors prometheus.Counter
|
||||
nodeEndpointSyncs prometheus.Counter
|
||||
nodeEndpointSyncErrors prometheus.Counter
|
||||
|
||||
kubeletObjectName string
|
||||
kubeletObjectNamespace string
|
||||
kubeletSelector string
|
||||
|
||||
annotations operator.Map
|
||||
labels operator.Map
|
||||
}
|
||||
|
||||
func New(
|
||||
logger log.Logger,
|
||||
restConfig *rest.Config,
|
||||
r prometheus.Registerer,
|
||||
kubeletObject string,
|
||||
kubeletSelector operator.LabelSelector,
|
||||
commonAnnotations operator.Map,
|
||||
commonLabels operator.Map,
|
||||
) (*Controller, error) {
|
||||
client, err := kubernetes.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("instantiating kubernetes client failed: %w", err)
|
||||
}
|
||||
|
||||
c := &Controller{
|
||||
kclient: client,
|
||||
|
||||
nodeAddressLookupErrors: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_address_lookup_errors_total",
|
||||
Help: "Number of times a node IP address could not be determined",
|
||||
}),
|
||||
nodeEndpointSyncs: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_syncs_total",
|
||||
Help: "Number of node endpoints synchronisations",
|
||||
}),
|
||||
nodeEndpointSyncErrors: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_syncs_failed_total",
|
||||
Help: "Number of node endpoints synchronisation failures",
|
||||
}),
|
||||
|
||||
kubeletSelector: kubeletSelector.String(),
|
||||
|
||||
annotations: commonAnnotations,
|
||||
labels: commonLabels,
|
||||
}
|
||||
|
||||
r.MustRegister(
|
||||
c.nodeAddressLookupErrors,
|
||||
c.nodeEndpointSyncs,
|
||||
c.nodeEndpointSyncErrors,
|
||||
)
|
||||
|
||||
parts := strings.Split(kubeletObject, "/")
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("malformatted kubelet object string %q, must be in format \"namespace/name\"", kubeletObject)
|
||||
}
|
||||
c.kubeletObjectNamespace = parts[0]
|
||||
c.kubeletObjectName = parts[1]
|
||||
|
||||
c.logger = log.With(logger, "kubelet_object", kubeletObject)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Controller) Run(ctx context.Context) error {
|
||||
ticker := time.NewTicker(resyncPeriod)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
c.syncNodeEndpointsWithLogError(ctx)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nodeAddress returns the provided node's address, based on the priority:
|
||||
// 1. NodeInternalIP
|
||||
// 2. NodeExternalIP
|
||||
//
|
||||
// Copied from github.com/prometheus/prometheus/discovery/kubernetes/node.go
|
||||
func nodeAddress(node v1.Node) (string, map[v1.NodeAddressType][]string, error) {
|
||||
m := map[v1.NodeAddressType][]string{}
|
||||
for _, a := range node.Status.Addresses {
|
||||
m[a.Type] = append(m[a.Type], a.Address)
|
||||
}
|
||||
|
||||
if addresses, ok := m[v1.NodeInternalIP]; ok {
|
||||
return addresses[0], m, nil
|
||||
}
|
||||
if addresses, ok := m[v1.NodeExternalIP]; ok {
|
||||
return addresses[0], m, nil
|
||||
}
|
||||
return "", m, fmt.Errorf("host address unknown")
|
||||
}
|
||||
|
||||
func getNodeAddresses(nodes *v1.NodeList) ([]v1.EndpointAddress, []error) {
|
||||
addresses := make([]v1.EndpointAddress, 0)
|
||||
errs := make([]error, 0)
|
||||
|
||||
for _, n := range nodes.Items {
|
||||
address, _, err := nodeAddress(n)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to determine hostname for node (%s): %w", n.Name, err))
|
||||
continue
|
||||
}
|
||||
addresses = append(addresses, v1.EndpointAddress{
|
||||
IP: address,
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: n.Name,
|
||||
UID: n.UID,
|
||||
APIVersion: n.APIVersion,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return addresses, errs
|
||||
}
|
||||
|
||||
func (c *Controller) syncNodeEndpointsWithLogError(ctx context.Context) {
|
||||
level.Debug(c.logger).Log("msg", "Synchronizing nodes")
|
||||
|
||||
c.nodeEndpointSyncs.Inc()
|
||||
err := c.syncNodeEndpoints(ctx)
|
||||
if err != nil {
|
||||
c.nodeEndpointSyncErrors.Inc()
|
||||
level.Error(c.logger).Log("msg", "Failed to synchronize nodes", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) syncNodeEndpoints(ctx context.Context) error {
|
||||
eps := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.kubeletObjectName,
|
||||
Annotations: c.annotations,
|
||||
Labels: c.labels.Merge(map[string]string{
|
||||
"k8s-app": "kubelet",
|
||||
"app.kubernetes.io/name": "kubelet",
|
||||
"app.kubernetes.io/managed-by": "prometheus-operator",
|
||||
}),
|
||||
},
|
||||
Subsets: []v1.EndpointSubset{
|
||||
{
|
||||
Ports: []v1.EndpointPort{
|
||||
{
|
||||
Name: "https-metrics",
|
||||
Port: 10250,
|
||||
},
|
||||
{
|
||||
Name: "http-metrics",
|
||||
Port: 10255,
|
||||
},
|
||||
{
|
||||
Name: "cadvisor",
|
||||
Port: 4194,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodes, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: c.kubeletSelector})
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing nodes failed: %w", err)
|
||||
}
|
||||
|
||||
level.Debug(c.logger).Log("msg", "Nodes retrieved from the Kubernetes API", "num_nodes", len(nodes.Items))
|
||||
|
||||
addresses, errs := getNodeAddresses(nodes)
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
level.Warn(c.logger).Log("err", err)
|
||||
}
|
||||
c.nodeAddressLookupErrors.Add(float64(len(errs)))
|
||||
}
|
||||
level.Debug(c.logger).Log("msg", "Nodes converted to endpoint addresses", "num_addresses", len(addresses))
|
||||
|
||||
eps.Subsets[0].Addresses = addresses
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.kubeletObjectName,
|
||||
Annotations: c.annotations,
|
||||
Labels: c.labels.Merge(map[string]string{
|
||||
"k8s-app": "kubelet",
|
||||
"app.kubernetes.io/name": "kubelet",
|
||||
"app.kubernetes.io/managed-by": "prometheus-operator",
|
||||
}),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
ClusterIP: "None",
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "https-metrics",
|
||||
Port: 10250,
|
||||
},
|
||||
{
|
||||
Name: "http-metrics",
|
||||
Port: 10255,
|
||||
},
|
||||
{
|
||||
Name: "cadvisor",
|
||||
Port: 4194,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
level.Debug(c.logger).Log("msg", "Updating Kubernetes service", "service")
|
||||
err = k8sutil.CreateOrUpdateService(ctx, c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("synchronizing kubelet service object failed: %w", err)
|
||||
}
|
||||
|
||||
level.Debug(c.logger).Log("msg", "Updating Kubernetes endpoint")
|
||||
err = k8sutil.CreateOrUpdateEndpoints(ctx, c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("synchronizing kubelet endpoints object failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
103
pkg/kubelet/controller_test.go
Normal file
103
pkg/kubelet/controller_test.go
Normal file
|
@ -0,0 +1,103 @@
|
|||
// Copyright 2023 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestGetNodeAddresses(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
name string
|
||||
nodes *v1.NodeList
|
||||
expectedAddresses []string
|
||||
expectedErrors int
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-0",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "10.0.0.1",
|
||||
Type: v1.NodeInternalIP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAddresses: []string{"10.0.0.1"},
|
||||
expectedErrors: 0,
|
||||
},
|
||||
{
|
||||
// Replicates #1815
|
||||
name: "missing ip on one node",
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-0",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "node-0",
|
||||
Type: v1.NodeHostName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-1",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "10.0.0.1",
|
||||
Type: v1.NodeInternalIP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAddresses: []string{"10.0.0.1"},
|
||||
expectedErrors: 1,
|
||||
},
|
||||
} {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
addrs, errs := getNodeAddresses(c.nodes)
|
||||
require.Equal(t, c.expectedErrors, len(errs))
|
||||
|
||||
ips := make([]string, 0)
|
||||
for _, addr := range addrs {
|
||||
ips = append(ips, addr.IP)
|
||||
}
|
||||
|
||||
require.Equal(t, c.expectedAddresses, ips)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -33,10 +33,6 @@ type Config struct {
|
|||
// Version reported by the Kubernetes API.
|
||||
KubernetesVersion version.Info
|
||||
|
||||
// Parameters for the kubelet endpoint controller.
|
||||
KubeletObject string
|
||||
KubeletSelector LabelSelector
|
||||
|
||||
// Cluster domain for Kubernetes services managed by the operator.
|
||||
ClusterDomain string
|
||||
|
||||
|
|
|
@ -82,15 +82,6 @@ type Operator struct {
|
|||
reconciliations *operator.ReconciliationTracker
|
||||
statusReporter prompkg.StatusReporter
|
||||
|
||||
nodeAddressLookupErrors prometheus.Counter
|
||||
nodeEndpointSyncs prometheus.Counter
|
||||
nodeEndpointSyncErrors prometheus.Counter
|
||||
|
||||
kubeletObjectName string
|
||||
kubeletObjectNamespace string
|
||||
kubeletSelector string
|
||||
kubeletSyncEnabled bool
|
||||
|
||||
endpointSliceSupported bool
|
||||
scrapeConfigSupported bool
|
||||
canReadStorageClass bool
|
||||
|
@ -113,20 +104,6 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
|
|||
return nil, fmt.Errorf("instantiating monitoring client failed: %w", err)
|
||||
}
|
||||
|
||||
kubeletObjectName := ""
|
||||
kubeletObjectNamespace := ""
|
||||
kubeletSyncEnabled := false
|
||||
|
||||
if c.KubeletObject != "" {
|
||||
parts := strings.Split(c.KubeletObject, "/")
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("malformatted kubelet object string, must be in format \"namespace/name\"")
|
||||
}
|
||||
kubeletObjectNamespace = parts[0]
|
||||
kubeletObjectName = parts[1]
|
||||
kubeletSyncEnabled = true
|
||||
}
|
||||
|
||||
// All the metrics exposed by the controller get the controller="prometheus" label.
|
||||
r = prometheus.WrapRegistererWith(prometheus.Labels{"controller": "prometheus"}, r)
|
||||
|
||||
|
@ -137,11 +114,6 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
|
|||
logger: logger,
|
||||
accessor: operator.NewAccessor(logger),
|
||||
|
||||
kubeletObjectName: kubeletObjectName,
|
||||
kubeletObjectNamespace: kubeletObjectNamespace,
|
||||
kubeletSyncEnabled: kubeletSyncEnabled,
|
||||
kubeletSelector: c.KubeletSelector.String(),
|
||||
|
||||
config: prompkg.Config{
|
||||
LocalHost: c.LocalHost,
|
||||
ReloaderConfig: c.ReloaderConfig,
|
||||
|
@ -152,27 +124,11 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
|
|||
},
|
||||
metrics: operator.NewMetrics(r),
|
||||
reconciliations: &operator.ReconciliationTracker{},
|
||||
nodeAddressLookupErrors: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_address_lookup_errors_total",
|
||||
Help: "Number of times a node IP address could not be determined",
|
||||
}),
|
||||
nodeEndpointSyncs: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_syncs_total",
|
||||
Help: "Number of node endpoints synchronisations",
|
||||
}),
|
||||
nodeEndpointSyncErrors: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_operator_node_syncs_failed_total",
|
||||
Help: "Number of node endpoints synchronisation failures",
|
||||
}),
|
||||
|
||||
scrapeConfigSupported: scrapeConfigSupported,
|
||||
canReadStorageClass: canReadStorageClass,
|
||||
}
|
||||
o.metrics.MustRegister(
|
||||
o.nodeAddressLookupErrors,
|
||||
o.nodeEndpointSyncs,
|
||||
o.nodeEndpointSyncErrors,
|
||||
o.reconciliations,
|
||||
)
|
||||
o.metrics.MustRegister(o.reconciliations)
|
||||
|
||||
o.rr = operator.NewResourceReconciler(
|
||||
o.logger,
|
||||
|
@ -509,10 +465,6 @@ func (c *Operator) Run(ctx context.Context) error {
|
|||
|
||||
c.addHandlers()
|
||||
|
||||
if c.kubeletSyncEnabled {
|
||||
go c.reconcileNodeEndpoints(ctx)
|
||||
}
|
||||
|
||||
// TODO(simonpasquier): watch for Prometheus pods instead of polling.
|
||||
go operator.StatusPoller(ctx, c)
|
||||
|
||||
|
@ -536,169 +488,6 @@ func (c *Operator) RefreshStatusFor(o metav1.Object) {
|
|||
c.rr.EnqueueForStatus(o)
|
||||
}
|
||||
|
||||
func (c *Operator) reconcileNodeEndpoints(ctx context.Context) {
|
||||
c.syncNodeEndpointsWithLogError(ctx)
|
||||
ticker := time.NewTicker(3 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.syncNodeEndpointsWithLogError(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nodeAddress returns the provided node's address, based on the priority:
|
||||
// 1. NodeInternalIP
|
||||
// 2. NodeExternalIP
|
||||
//
|
||||
// Copied from github.com/prometheus/prometheus/discovery/kubernetes/node.go
|
||||
func nodeAddress(node v1.Node) (string, map[v1.NodeAddressType][]string, error) {
|
||||
m := map[v1.NodeAddressType][]string{}
|
||||
for _, a := range node.Status.Addresses {
|
||||
m[a.Type] = append(m[a.Type], a.Address)
|
||||
}
|
||||
|
||||
if addresses, ok := m[v1.NodeInternalIP]; ok {
|
||||
return addresses[0], m, nil
|
||||
}
|
||||
if addresses, ok := m[v1.NodeExternalIP]; ok {
|
||||
return addresses[0], m, nil
|
||||
}
|
||||
return "", m, fmt.Errorf("host address unknown")
|
||||
}
|
||||
|
||||
func getNodeAddresses(nodes *v1.NodeList) ([]v1.EndpointAddress, []error) {
|
||||
addresses := make([]v1.EndpointAddress, 0)
|
||||
errs := make([]error, 0)
|
||||
|
||||
for _, n := range nodes.Items {
|
||||
address, _, err := nodeAddress(n)
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to determine hostname for node (%s): %w", n.Name, err))
|
||||
continue
|
||||
}
|
||||
addresses = append(addresses, v1.EndpointAddress{
|
||||
IP: address,
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: n.Name,
|
||||
UID: n.UID,
|
||||
APIVersion: n.APIVersion,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return addresses, errs
|
||||
}
|
||||
|
||||
func (c *Operator) syncNodeEndpointsWithLogError(ctx context.Context) {
|
||||
level.Debug(c.logger).Log("msg", "Syncing nodes into Endpoints object")
|
||||
|
||||
c.nodeEndpointSyncs.Inc()
|
||||
err := c.syncNodeEndpoints(ctx)
|
||||
if err != nil {
|
||||
c.nodeEndpointSyncErrors.Inc()
|
||||
level.Error(c.logger).Log("msg", "Syncing nodes into Endpoints object failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) syncNodeEndpoints(ctx context.Context) error {
|
||||
logger := log.With(c.logger, "operation", "syncNodeEndpoints")
|
||||
eps := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.kubeletObjectName,
|
||||
Annotations: c.config.Annotations,
|
||||
Labels: c.config.Labels.Merge(map[string]string{
|
||||
"k8s-app": "kubelet",
|
||||
"app.kubernetes.io/name": "kubelet",
|
||||
"app.kubernetes.io/managed-by": "prometheus-operator",
|
||||
}),
|
||||
},
|
||||
Subsets: []v1.EndpointSubset{
|
||||
{
|
||||
Ports: []v1.EndpointPort{
|
||||
{
|
||||
Name: "https-metrics",
|
||||
Port: 10250,
|
||||
},
|
||||
{
|
||||
Name: "http-metrics",
|
||||
Port: 10255,
|
||||
},
|
||||
{
|
||||
Name: "cadvisor",
|
||||
Port: 4194,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodes, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: c.kubeletSelector})
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing nodes failed: %w", err)
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("msg", "Nodes retrieved from the Kubernetes API", "num_nodes", len(nodes.Items))
|
||||
|
||||
addresses, errs := getNodeAddresses(nodes)
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
level.Warn(logger).Log("err", err)
|
||||
}
|
||||
c.nodeAddressLookupErrors.Add(float64(len(errs)))
|
||||
}
|
||||
level.Debug(logger).Log("msg", "Nodes converted to endpoint addresses", "num_addresses", len(addresses))
|
||||
|
||||
eps.Subsets[0].Addresses = addresses
|
||||
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.kubeletObjectName,
|
||||
Labels: c.config.Labels.Merge(map[string]string{
|
||||
"k8s-app": "kubelet",
|
||||
"app.kubernetes.io/name": "kubelet",
|
||||
"app.kubernetes.io/managed-by": "prometheus-operator",
|
||||
}),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
ClusterIP: "None",
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "https-metrics",
|
||||
Port: 10250,
|
||||
},
|
||||
{
|
||||
Name: "http-metrics",
|
||||
Port: 10255,
|
||||
},
|
||||
{
|
||||
Name: "cadvisor",
|
||||
Port: 4194,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("msg", "Updating Kubernetes service", "service", c.kubeletObjectName, "ns", c.kubeletObjectNamespace)
|
||||
err = k8sutil.CreateOrUpdateService(ctx, c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("synchronizing kubelet service object failed: %w", err)
|
||||
}
|
||||
|
||||
level.Debug(logger).Log("msg", "Updating Kubernetes endpoint", "endpoint", c.kubeletObjectName, "ns", c.kubeletObjectNamespace)
|
||||
err = k8sutil.CreateOrUpdateEndpoints(ctx, c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("synchronizing kubelet endpoints object failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Don't enqueue just for the namespace
|
||||
func (c *Operator) handleSmonAdd(obj interface{}) {
|
||||
o, ok := c.accessor.ObjectMetadata(obj)
|
||||
|
|
|
@ -15,10 +15,8 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
|
@ -241,86 +239,3 @@ func TestCreateStatefulSetInputHash(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNodeAddresses(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
nodes *v1.NodeList
|
||||
expectedAddresses []string
|
||||
expectedErrors int
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-0",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "10.0.0.1",
|
||||
Type: v1.NodeInternalIP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAddresses: []string{"10.0.0.1"},
|
||||
expectedErrors: 0,
|
||||
},
|
||||
{
|
||||
// Replicates #1815
|
||||
name: "missing ip on one node",
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-0",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "node-0",
|
||||
Type: v1.NodeHostName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-1",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Addresses: []v1.NodeAddress{
|
||||
{
|
||||
Address: "10.0.0.1",
|
||||
Type: v1.NodeInternalIP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAddresses: []string{"10.0.0.1"},
|
||||
expectedErrors: 1,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
addrs, errs := getNodeAddresses(c.nodes)
|
||||
if len(errs) != c.expectedErrors {
|
||||
t.Errorf("Expected %d errors, got %d. Errors: %v", c.expectedErrors, len(errs), errs)
|
||||
}
|
||||
ips := make([]string, 0)
|
||||
for _, addr := range addrs {
|
||||
ips = append(ips, addr.IP)
|
||||
}
|
||||
if !reflect.DeepEqual(ips, c.expectedAddresses) {
|
||||
t.Error(pretty.Compare(ips, c.expectedAddresses))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue