1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-20 19:29:10 +00:00

feat: support EndpointSlice for the kubelet controller ()

* feat: support EndpointSlice for the kubelet controller

This change adds support for managing `EndpointSlice` objects for the
kubelet service. The controller can manage either one of `Endpoints` and
`EndpointSlice` or both.

The migration path can be:
1. Configure the operator to manage both objects in the kubelet
   controller.
2. Verify that the generated `EndpointSlice` objects are correct.
3. Configure the Prometheus object to use the `EndpointSlice` role
   instead of `Endpoints`.
1. Configure the operator to manage only `Endpoints` objects in the
   kubelet controller.

The removal of the legacy `Endpoints` object is left to the user.

---------

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2024-09-17 08:53:31 +02:00 committed by GitHub
parent bb244317df
commit 8518981b50
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1133 additions and 371 deletions

View file

@ -1,3 +1,7 @@
## Unreleased
* [FEATURE] Add `-kubelet-endpointslice` argument to support `EndpointSlice` for the kubelet controller.
## 0.76.2 / 2024-09-09
* [BUGFIX] Fix OAuth2 TLSConfig nil pointer. #6909

View file

@ -57,6 +57,10 @@ Usage of ./operator:
PrometheusAgentDaemonSet: Enables the DaemonSet mode for PrometheusAgent (enabled: false)
-key-file string
- NOT RECOMMENDED FOR PRODUCTION - Path to private TLS certificate file.
-kubelet-endpoints
Create Endpoints objects for kubelet targets. (default true)
-kubelet-endpointslice
Create EndpointSlice objects for kubelet targets.
-kubelet-node-address-priority value
Node address priority used by kubelet. Either 'internal' or 'external'. Default: 'internal'.
-kubelet-selector value

View file

@ -77,7 +77,6 @@ rules:
resources:
- services
- services/finalizers
- endpoints
verbs:
- get
- create
@ -119,6 +118,15 @@ rules:
- storageclasses
verbs:
- get
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- create
- update
- delete
```
> Note: A cluster admin is required to create this `ClusterRole` and create a `ClusterRoleBinding` or `RoleBinding` to the `ServiceAccount` used by the Prometheus Operator `Pod`. The `ServiceAccount` used by the Prometheus Operator `Pod` can be specified in the `Deployment` object used to deploy it.

View file

@ -77,7 +77,6 @@ rules:
resources:
- services
- services/finalizers
- endpoints
verbs:
- get
- create
@ -119,6 +118,15 @@ rules:
- storageclasses
verbs:
- get
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- create
- update
- delete
```
Similarly to Prometheus, Prometheus Agent will also require permission to scrape targets. Because of this, we will create a new service account for the Agent with the necessary permissions to scrape targets.

12
bundle.yaml generated
View file

@ -64819,7 +64819,6 @@ rules:
resources:
- services
- services/finalizers
- endpoints
verbs:
- get
- create
@ -64861,6 +64860,15 @@ rules:
- storageclasses
verbs:
- get
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- create
- update
- delete
---
apiVersion: apps/v1
kind: Deployment
@ -64891,6 +64899,8 @@ spec:
- args:
- --kubelet-service=kube-system/kubelet
- --prometheus-config-reloader=quay.io/prometheus-operator/prometheus-config-reloader:v0.76.2
- --kubelet-endpoints=true
- --kubelet-endpointslice=false
env:
- name: GOGC
value: "30"

View file

@ -25,6 +25,7 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"github.com/blang/semver/v4"
@ -36,6 +37,7 @@ import (
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
@ -118,9 +120,11 @@ var (
serverConfig = server.DefaultConfig(":8080", false)
// Parameters for the kubelet endpoints controller.
kubeletObject string
kubeletSelector operator.LabelSelector
nodeAddressPriority operator.NodeAddressPriority
kubeletObject string
kubeletSelector operator.LabelSelector
nodeAddressPriority operator.NodeAddressPriority
kubeletEndpoints bool
kubeletEndpointSlice bool
featureGates = k8sflag.NewMapStringBool(ptr.To(map[string]bool{}))
)
@ -140,6 +144,8 @@ func parseFlags(fs *flag.FlagSet) {
fs.StringVar(&kubeletObject, "kubelet-service", "", "Service/Endpoints object to write kubelets into in format \"namespace/name\"")
fs.Var(&kubeletSelector, "kubelet-selector", "Label selector to filter nodes.")
fs.Var(&nodeAddressPriority, "kubelet-node-address-priority", "Node address priority used by kubelet. Either 'internal' or 'external'. Default: 'internal'.")
fs.BoolVar(&kubeletEndpointSlice, "kubelet-endpointslice", false, "Create EndpointSlice objects for kubelet targets.")
fs.BoolVar(&kubeletEndpoints, "kubelet-endpoints", true, "Create Endpoints objects for kubelet targets.")
// The Prometheus config reloader image is released along with the
// Prometheus Operator image, tagged with the same semver version. Default to
@ -519,15 +525,55 @@ func run(fs *flag.FlagSet) int {
var kec *kubelet.Controller
if kubeletObject != "" {
opts := []kubelet.ControllerOption{kubelet.WithNodeAddressPriority(nodeAddressPriority.String())}
kubeletService := strings.Split(kubeletObject, "/")
if len(kubeletService) != 2 {
logger.Error(fmt.Sprintf("malformatted kubelet object string %q, must be in format \"namespace/name\"", kubeletObject))
cancel()
return 1
}
if kubeletEndpointSlice {
allowed, errs, err := k8sutil.IsAllowed(
ctx,
kclient.AuthorizationV1().SelfSubjectAccessReviews(),
[]string{kubeletService[0]},
k8sutil.ResourceAttribute{
Group: discoveryv1.SchemeGroupVersion.Group,
Version: discoveryv1.SchemeGroupVersion.Version,
Resource: "endpointslices",
Verbs: []string{"get", "list", "create", "update", "delete"},
})
if err != nil {
logger.Error(fmt.Sprintf("failed to check permissions on resource 'endpointslices' (group %q)", discoveryv1.SchemeGroupVersion.Group), "err", err)
cancel()
return 1
}
if !allowed {
for _, reason := range errs {
logger.Warn(fmt.Sprintf("missing permission on resource 'endpointslices' (group: %q)", discoveryv1.SchemeGroupVersion.Group), "reason", reason)
}
} else {
opts = append(opts, kubelet.WithEndpointSlice())
}
}
if kubeletEndpoints {
opts = append(opts, kubelet.WithEndpoints())
}
if kec, err = kubelet.New(
logger.With("component", "kubelet_endpoints"),
kclient,
r,
kubeletObject,
kubeletService[1],
kubeletService[0],
kubeletSelector,
cfg.Annotations,
cfg.Labels,
nodeAddressPriority,
opts...,
); err != nil {
logger.Error("instantiating kubelet endpoints controller failed", "err", err)
cancel()

View file

@ -55,7 +55,6 @@ rules:
resources:
- services
- services/finalizers
- endpoints
verbs:
- get
- create
@ -97,3 +96,12 @@ rules:
- storageclasses
verbs:
- get
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- create
- update
- delete

View file

@ -27,6 +27,8 @@ spec:
- args:
- --kubelet-service=kube-system/kubelet
- --prometheus-config-reloader=quay.io/prometheus-operator/prometheus-config-reloader:v0.76.2
- --kubelet-endpoints=true
- --kubelet-endpointslice=false
env:
- name: GOGC
value: "30"

View file

@ -27,6 +27,9 @@ local defaults = {
if !std.setMember(labelName, ['app.kubernetes.io/version'])
},
enableAlertmanagerConfigV1beta1: false,
kubeletService: 'kube-system/kubelet',
kubeletEndpointsEnabled: true,
kubeletEndpointSliceEnabled: false,
};
function(params) {
@ -75,80 +78,106 @@ function(params) {
labels: po.config.commonLabels,
},
rules: [
{
apiGroups: ['monitoring.coreos.com'],
resources: [
'alertmanagers',
'alertmanagers/finalizers',
'alertmanagers/status',
'alertmanagerconfigs',
'prometheuses',
'prometheuses/finalizers',
'prometheuses/status',
'prometheusagents',
'prometheusagents/finalizers',
'prometheusagents/status',
'thanosrulers',
'thanosrulers/finalizers',
'thanosrulers/status',
'scrapeconfigs',
'servicemonitors',
'podmonitors',
'probes',
'prometheusrules',
],
verbs: ['*'],
},
{
apiGroups: ['apps'],
resources: ['statefulsets'],
verbs: ['*'],
},
{
apiGroups: [''],
resources: ['configmaps', 'secrets'],
verbs: ['*'],
},
{
apiGroups: [''],
resources: ['pods'],
verbs: ['list', 'delete'],
},
{
apiGroups: [''],
resources: [
'services',
'services/finalizers',
'endpoints',
],
verbs: ['get', 'create', 'update', 'delete'],
},
{
apiGroups: [''],
resources: ['nodes'],
verbs: ['list', 'watch'],
},
{
apiGroups: [''],
resources: ['namespaces'],
verbs: ['get', 'list', 'watch'],
},
{
apiGroups: [''],
resources: ['events'],
verbs: ['patch', 'create'],
},
{
apiGroups: ['networking.k8s.io'],
resources: ['ingresses'],
verbs: ['get', 'list', 'watch'],
},
{
apiGroups: ['storage.k8s.io'],
resources: ['storageclasses'],
verbs: ['get'],
},
],
{
apiGroups: ['monitoring.coreos.com'],
resources: [
'alertmanagers',
'alertmanagers/finalizers',
'alertmanagers/status',
'alertmanagerconfigs',
'prometheuses',
'prometheuses/finalizers',
'prometheuses/status',
'prometheusagents',
'prometheusagents/finalizers',
'prometheusagents/status',
'thanosrulers',
'thanosrulers/finalizers',
'thanosrulers/status',
'scrapeconfigs',
'servicemonitors',
'podmonitors',
'probes',
'prometheusrules',
],
verbs: ['*'],
},
{
apiGroups: ['apps'],
resources: ['statefulsets'],
verbs: ['*'],
},
{
apiGroups: [''],
resources: ['configmaps', 'secrets'],
verbs: ['*'],
},
{
apiGroups: [''],
resources: ['pods'],
verbs: ['list', 'delete'],
},
{
apiGroups: [''],
resources: [
'services',
'services/finalizers',
],
verbs: ['get', 'create', 'update', 'delete'],
},
{
apiGroups: [''],
resources: ['nodes'],
verbs: ['list', 'watch'],
},
{
apiGroups: [''],
resources: ['namespaces'],
verbs: ['get', 'list', 'watch'],
},
{
apiGroups: [''],
resources: ['events'],
verbs: ['patch', 'create'],
},
{
apiGroups: ['networking.k8s.io'],
resources: ['ingresses'],
verbs: ['get', 'list', 'watch'],
},
{
apiGroups: ['storage.k8s.io'],
resources: ['storageclasses'],
verbs: ['get'],
},
] + (
if po.config.kubeletEndpointsEnabled then
[
{
apiGroups: [''],
resources: [
'endpoints',
],
verbs: ['get', 'create', 'update', 'delete'],
},
]
else
[]
)
+ (
if po.config.kubeletEndpointSliceEnabled then
[
{
apiGroups: ['discovery.k8s.io'],
resources: [
'endpointslices',
],
verbs: ['get', 'create', 'list', 'update', 'delete'],
},
]
else
[]
),
},
deployment:
@ -161,9 +190,11 @@ function(params) {
name: po.config.name,
image: po.config.image,
args: [
'--kubelet-service=kube-system/kubelet',
'--kubelet-service=' + po.config.kubeletService,
'--prometheus-config-reloader=' + po.config.configReloaderImage,
] +
[std.format('--kubelet-endpoints=%s', po.config.kubeletEndpointsEnabled)] +
[std.format('--kubelet-endpointslice=%s', po.config.kubeletEndpointSliceEnabled)] +
reloaderResourceArg('--config-reloader-cpu-limit', po.config.configReloaderResources.limits.cpu) +
reloaderResourceArg('--config-reloader-memory-limit', po.config.configReloaderResources.limits.memory) +
reloaderResourceArg('--config-reloader-cpu-request', po.config.configReloaderResources.requests.cpu) +

View file

@ -564,7 +564,7 @@ func (c *Operator) sync(ctx context.Context, key string) error {
// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(am.Namespace)
if err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(am, c.config)); err != nil {
if _, err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(am, c.config)); err != nil {
return fmt.Errorf("synchronizing governing service failed: %w", err)
}

View file

@ -29,6 +29,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
authv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -40,6 +41,7 @@ import (
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
clientauthv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
clientdiscoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
@ -229,16 +231,18 @@ func IsResourceNotFoundError(err error) bool {
return false
}
func CreateOrUpdateService(ctx context.Context, sclient clientv1.ServiceInterface, svc *v1.Service) error {
func CreateOrUpdateService(ctx context.Context, sclient clientv1.ServiceInterface, svc *v1.Service) (*v1.Service, error) {
var ret *v1.Service
// As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped.
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
service, err := sclient.Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
_, err = sclient.Create(ctx, svc, metav1.CreateOptions{})
ret, err = sclient.Create(ctx, svc, metav1.CreateOptions{})
return err
}
@ -251,9 +255,11 @@ func CreateOrUpdateService(ctx context.Context, sclient clientv1.ServiceInterfac
svc.SetOwnerReferences(mergeOwnerReferences(service.GetOwnerReferences(), svc.GetOwnerReferences()))
mergeMetadata(&svc.ObjectMeta, service.ObjectMeta)
_, err = sclient.Update(ctx, svc, metav1.UpdateOptions{})
ret, err = sclient.Update(ctx, svc, metav1.UpdateOptions{})
return err
})
return ret, err
}
func CreateOrUpdateEndpoints(ctx context.Context, eclient clientv1.EndpointsInterface, eps *v1.Endpoints) error {
@ -276,6 +282,31 @@ func CreateOrUpdateEndpoints(ctx context.Context, eclient clientv1.EndpointsInte
})
}
func CreateOrUpdateEndpointSlice(ctx context.Context, c clientdiscoveryv1.EndpointSliceInterface, eps *discoveryv1.EndpointSlice) error {
// As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped.
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
if eps.Name == "" {
_, err := c.Create(ctx, eps, metav1.CreateOptions{})
return err
}
endpoints, err := c.Get(ctx, eps.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
_, err = c.Create(ctx, eps, metav1.CreateOptions{})
return err
}
mergeMetadata(&eps.ObjectMeta, endpoints.ObjectMeta)
_, err = c.Update(ctx, eps, metav1.UpdateOptions{})
return err
})
}
// UpdateStatefulSet merges metadata of existing StatefulSet with new one and updates it.
func UpdateStatefulSet(ctx context.Context, sstClient clientappsv1.StatefulSetInterface, sset *appsv1.StatefulSet) error {
// As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped.

View file

@ -315,7 +315,7 @@ func TestMergeMetadata(t *testing.T) {
_, err := svcClient.Update(context.Background(), modifiedSvc, metav1.UpdateOptions{})
require.NoError(t, err)
err = CreateOrUpdateService(context.Background(), svcClient, service)
_, err = CreateOrUpdateService(context.Background(), svcClient, service)
require.NoError(t, err)
updatedSvc, err := svcClient.Get(context.Background(), "prometheus-operated", metav1.GetOptions{})
@ -504,7 +504,8 @@ func TestCreateOrUpdateImmutableFields(t *testing.T) {
Status: corev1.ServiceStatus{},
}
require.NoError(t, CreateOrUpdateService(context.TODO(), svcClient, modifiedSvc))
_, err := CreateOrUpdateService(context.TODO(), svcClient, modifiedSvc)
require.NoError(t, err)
require.Equal(t, service.Spec.IPFamilies, modifiedSvc.Spec.IPFamilies, "services Spec.IPFamilies are not equal, expected %q, got %q",
service.Spec.IPFamilies, modifiedSvc.Spec.IPFamilies)

View file

@ -18,19 +18,39 @@ import (
"context"
"fmt"
"log/slog"
"net"
"slices"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
)
const resyncPeriod = 3 * time.Minute
const (
resyncPeriod = 3 * time.Minute
maxEndpointsPerSlice = 512
endpointsLabel = "endpoints"
endpointSliceLabel = "endpointslice"
httpsPort = int32(10250)
httpsPortName = "https-metrics"
httpPort = int32(10255)
httpPortName = "http-metric"
cAdvisorPort = int32(4194)
cAdvisorPortName = "cadvisor"
)
type Controller struct {
logger *slog.Logger
@ -38,8 +58,8 @@ type Controller struct {
kclient kubernetes.Interface
nodeAddressLookupErrors prometheus.Counter
nodeEndpointSyncs prometheus.Counter
nodeEndpointSyncErrors prometheus.Counter
nodeEndpointSyncs *prometheus.CounterVec
nodeEndpointSyncErrors *prometheus.CounterVec
kubeletObjectName string
kubeletObjectNamespace string
@ -48,18 +68,49 @@ type Controller struct {
annotations operator.Map
labels operator.Map
nodeAddressPriority string
nodeAddressPriority string
maxEndpointsPerSlice int
manageEndpointSlice bool
manageEndpoints bool
}
type ControllerOption func(*Controller)
func WithEndpointSlice() ControllerOption {
return func(c *Controller) {
c.manageEndpointSlice = true
}
}
func WithMaxEndpointsPerSlice(v int) ControllerOption {
return func(c *Controller) {
c.maxEndpointsPerSlice = v
}
}
func WithEndpoints() ControllerOption {
return func(c *Controller) {
c.manageEndpoints = true
}
}
func WithNodeAddressPriority(s string) ControllerOption {
return func(c *Controller) {
c.nodeAddressPriority = s
}
}
func New(
logger *slog.Logger,
kclient *kubernetes.Clientset,
kclient kubernetes.Interface,
r prometheus.Registerer,
kubeletObject string,
kubeletServiceName string,
kubeletServiceNamespace string,
kubeletSelector operator.LabelSelector,
commonAnnotations operator.Map,
commonLabels operator.Map,
nodeAddressPriority operator.NodeAddressPriority,
opts ...ControllerOption,
) (*Controller, error) {
c := &Controller{
kclient: kclient,
@ -68,46 +119,97 @@ func New(
Name: "prometheus_operator_node_address_lookup_errors_total",
Help: "Number of times a node IP address could not be determined",
}),
nodeEndpointSyncs: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_operator_node_syncs_total",
Help: "Number of node endpoints synchronisations",
}),
nodeEndpointSyncErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_operator_node_syncs_failed_total",
Help: "Number of node endpoints synchronisation failures",
}),
nodeEndpointSyncs: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_operator_node_syncs_total",
Help: "Total number of synchronisations for the given resource",
},
[]string{"resource"},
),
nodeEndpointSyncErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_operator_node_syncs_failed_total",
Help: "Total number of failed synchronisations for the given resource",
},
[]string{"resource"},
),
kubeletSelector: kubeletSelector.String(),
kubeletObjectName: kubeletServiceName,
kubeletObjectNamespace: kubeletServiceNamespace,
kubeletSelector: kubeletSelector.String(),
maxEndpointsPerSlice: maxEndpointsPerSlice,
annotations: commonAnnotations,
labels: commonLabels,
nodeAddressPriority: nodeAddressPriority.String(),
}
for _, opt := range opts {
opt(c)
}
if !c.manageEndpoints && !c.manageEndpointSlice {
return nil, fmt.Errorf("at least one of endpoints or endpointslice needs to be enabled")
}
for _, v := range []string{
endpointsLabel,
endpointSliceLabel,
} {
c.nodeEndpointSyncs.WithLabelValues(v)
c.nodeEndpointSyncErrors.WithLabelValues(v)
}
if r == nil {
r = prometheus.NewRegistry()
}
r.MustRegister(
c.nodeAddressLookupErrors,
c.nodeEndpointSyncs,
c.nodeEndpointSyncErrors,
prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "prometheus_operator_kubelet_managed_resource",
Help: "",
ConstLabels: prometheus.Labels{
"resource": endpointsLabel,
},
},
func() float64 {
if c.manageEndpoints {
return 1.0
}
return 0.0
},
),
prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "prometheus_operator_kubelet_managed_resource",
Help: "",
ConstLabels: prometheus.Labels{
"resource": endpointSliceLabel,
},
},
func() float64 {
if c.manageEndpointSlice {
return 1.0
}
return 0.0
},
),
)
parts := strings.Split(kubeletObject, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("malformatted kubelet object string %q, must be in format \"namespace/name\"", kubeletObject)
}
c.kubeletObjectNamespace = parts[0]
c.kubeletObjectName = parts[1]
c.logger = logger.With("kubelet_object", kubeletObject)
c.logger = logger.With("kubelet_object", fmt.Sprintf("%s/%s", c.kubeletObjectNamespace, c.kubeletObjectName))
return c, nil
}
func (c *Controller) Run(ctx context.Context) error {
c.logger.Info("Starting controller")
ticker := time.NewTicker(resyncPeriod)
defer ticker.Stop()
for {
c.syncNodeEndpointsWithLogError(ctx)
c.sync(ctx)
select {
case <-ctx.Done():
@ -161,35 +263,80 @@ func nodeReadyConditionKnown(node v1.Node) bool {
return false
}
func (c *Controller) getNodeAddresses(nodes *v1.NodeList) ([]v1.EndpointAddress, []error) {
addresses := make([]v1.EndpointAddress, 0)
errs := make([]error, 0)
readyKnownNodes := make(map[string]string)
readyUnknownNodes := make(map[string]string)
type nodeAddress struct {
apiVersion string
ipAddress string
name string
uid types.UID
ipv4 bool
ready bool
}
for _, n := range nodes.Items {
func (na *nodeAddress) discoveryV1Endpoint() discoveryv1.Endpoint {
return discoveryv1.Endpoint{
Addresses: []string{na.ipAddress},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
},
TargetRef: &v1.ObjectReference{
Kind: "Node",
Name: na.name,
UID: na.uid,
APIVersion: na.apiVersion,
},
}
}
func (na *nodeAddress) v1EndpointAddress() v1.EndpointAddress {
return v1.EndpointAddress{
IP: na.ipAddress,
TargetRef: &v1.ObjectReference{
Kind: "Node",
Name: na.name,
UID: na.uid,
APIVersion: na.apiVersion,
},
}
}
func (c *Controller) getNodeAddresses(nodes []v1.Node) ([]nodeAddress, []error) {
var (
addresses = make([]nodeAddress, 0, len(nodes))
readyKnownNodes = map[string]string{}
readyUnknownNodes = map[string]string{}
errs []error
)
for _, n := range nodes {
address, _, err := c.nodeAddress(n)
if err != nil {
errs = append(errs, fmt.Errorf("failed to determine hostname for node (%s): %w", n.Name, err))
errs = append(errs, fmt.Errorf("failed to determine hostname for node %q (priority: %s): %w", n.Name, c.nodeAddressPriority, err))
continue
}
addresses = append(addresses, v1.EndpointAddress{
IP: address,
TargetRef: &v1.ObjectReference{
Kind: "Node",
Name: n.Name,
UID: n.UID,
APIVersion: n.APIVersion,
},
})
if !nodeReadyConditionKnown(n) {
if c.logger != nil {
c.logger.Info("Node Ready condition is Unknown", "node", n.GetName())
}
ip := net.ParseIP(address)
if ip == nil {
errs = append(errs, fmt.Errorf("failed to parse IP address %q for node %q (priority: %s): %w", address, n.Name, c.nodeAddressPriority, err))
continue
}
na := nodeAddress{
ipAddress: address,
name: n.Name,
uid: n.UID,
apiVersion: n.APIVersion,
ipv4: ip.To4() != nil,
ready: nodeReadyConditionKnown(n),
}
addresses = append(addresses, na)
if !na.ready {
c.logger.Info("Node Ready condition is Unknown", "node", n.GetName())
readyUnknownNodes[address] = n.Name
continue
}
readyKnownNodes[address] = n.Name
}
@ -197,31 +344,71 @@ func (c *Controller) getNodeAddresses(nodes *v1.NodeList) ([]v1.EndpointAddress,
// duplicate IP address. If this is the case, we want to keep just the node
// with the duplicate IP address that has a known ready state. This also
// ensures that order of addresses are preserved.
addressesFinal := make([]v1.EndpointAddress, 0)
addressesFinal := make([]nodeAddress, 0)
for _, address := range addresses {
knownNodeName, foundKnown := readyKnownNodes[address.IP]
_, foundUnknown := readyUnknownNodes[address.IP]
if foundKnown && foundUnknown && address.TargetRef.Name != knownNodeName {
knownNodeName, foundKnown := readyKnownNodes[address.ipAddress]
_, foundUnknown := readyUnknownNodes[address.ipAddress]
if foundKnown && foundUnknown && address.name != knownNodeName {
continue
}
addressesFinal = append(addressesFinal, address)
}
return addressesFinal, errs
}
func (c *Controller) syncNodeEndpointsWithLogError(ctx context.Context) {
func (c *Controller) sync(ctx context.Context) {
c.logger.Debug("Synchronizing nodes")
c.nodeEndpointSyncs.Inc()
err := c.syncNodeEndpoints(ctx)
//TODO(simonpasquier): add failed/attempted counters.
nodeList, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: c.kubeletSelector})
if err != nil {
c.nodeEndpointSyncErrors.Inc()
c.logger.Error("Failed to synchronize nodes", "err", err)
c.logger.Error("Failed to list nodes", "err", err)
return
}
// Sort the nodes slice by their name.
nodes := nodeList.Items
slices.SortStableFunc(nodes, func(a, b v1.Node) int {
return strings.Compare(a.Name, b.Name)
})
c.logger.Debug("Nodes retrieved from the Kubernetes API", "num_nodes", len(nodes))
addresses, errs := c.getNodeAddresses(nodes)
if len(errs) > 0 {
for _, err := range errs {
c.logger.Warn(err.Error())
}
c.nodeAddressLookupErrors.Add(float64(len(errs)))
}
c.logger.Debug("Nodes converted to endpoint addresses", "num_addresses", len(addresses))
svc, err := c.syncService(ctx)
if err != nil {
c.logger.Error("Failed to synchronize kubelet service", "err", err)
}
if c.manageEndpoints {
c.nodeEndpointSyncs.WithLabelValues(endpointsLabel).Inc()
if err = c.syncEndpoints(ctx, addresses); err != nil {
c.nodeEndpointSyncErrors.WithLabelValues(endpointsLabel).Inc()
c.logger.Error("Failed to synchronize kubelet endpoints", "err", err)
}
}
if c.manageEndpointSlice {
c.nodeEndpointSyncs.WithLabelValues(endpointSliceLabel).Inc()
if err = c.syncEndpointSlice(ctx, svc, addresses); err != nil {
c.nodeEndpointSyncErrors.WithLabelValues(endpointSliceLabel).Inc()
c.logger.Error("Failed to synchronize kubelet endpointslice", "err", err)
}
}
}
func (c *Controller) syncNodeEndpoints(ctx context.Context) error {
func (c *Controller) syncEndpoints(ctx context.Context, addresses []nodeAddress) error {
c.logger.Debug("Sync endpoints")
eps := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.kubeletObjectName,
@ -234,41 +421,46 @@ func (c *Controller) syncNodeEndpoints(ctx context.Context) error {
},
Subsets: []v1.EndpointSubset{
{
Addresses: make([]v1.EndpointAddress, len(addresses)),
Ports: []v1.EndpointPort{
{
Name: "https-metrics",
Port: 10250,
Name: httpsPortName,
Port: httpsPort,
},
{
Name: "http-metrics",
Port: 10255,
Name: httpPortName,
Port: httpPort,
},
{
Name: "cadvisor",
Port: 4194,
Name: cAdvisorPortName,
Port: cAdvisorPort,
},
},
},
},
}
nodes, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: c.kubeletSelector})
if c.manageEndpointSlice {
// Tell the endpointslice mirroring controller that it shouldn't manage
// the endpoints object since this controller is in charge.
eps.ObjectMeta.Labels[discoveryv1.LabelSkipMirror] = "true"
}
for i, na := range addresses {
eps.Subsets[0].Addresses[i] = na.v1EndpointAddress()
}
c.logger.Debug("Updating Kubernetes endpoint")
err := k8sutil.CreateOrUpdateEndpoints(ctx, c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
if err != nil {
return fmt.Errorf("listing nodes failed: %w", err)
return err
}
c.logger.Debug("Nodes retrieved from the Kubernetes API", "num_nodes", len(nodes.Items))
return nil
}
addresses, errs := c.getNodeAddresses(nodes)
if len(errs) > 0 {
for _, err := range errs {
c.logger.Warn("", "err", err)
}
c.nodeAddressLookupErrors.Add(float64(len(errs)))
}
c.logger.Debug("Nodes converted to endpoint addresses", "num_addresses", len(addresses))
eps.Subsets[0].Addresses = addresses
func (c *Controller) syncService(ctx context.Context) (*v1.Service, error) {
c.logger.Debug("Sync service")
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -282,35 +474,206 @@ func (c *Controller) syncNodeEndpoints(ctx context.Context) error {
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "None",
ClusterIP: v1.ClusterIPNone,
Ports: []v1.ServicePort{
{
Name: "https-metrics",
Port: 10250,
Name: httpsPortName,
Port: httpsPort,
},
{
Name: "http-metrics",
Port: 10255,
Name: httpPortName,
Port: httpPort,
},
{
Name: "cadvisor",
Port: 4194,
Name: cAdvisorPortName,
Port: cAdvisorPort,
},
},
},
}
c.logger.Debug("Updating Kubernetes service")
err = k8sutil.CreateOrUpdateService(ctx, c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
c.logger.Debug("Updating Kubernetes service", "service", c.kubeletObjectName)
return k8sutil.CreateOrUpdateService(ctx, c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
}
func (c *Controller) syncEndpointSlice(ctx context.Context, svc *v1.Service, addresses []nodeAddress) error {
c.logger.Debug("Sync endpointslice")
// Get the list of endpointslice objects associated to the service.
client := c.kclient.DiscoveryV1().EndpointSlices(c.kubeletObjectNamespace)
l, err := client.List(ctx, metav1.ListOptions{
LabelSelector: labels.Set{discoveryv1.LabelServiceName: c.kubeletObjectName}.String(),
})
if err != nil {
return fmt.Errorf("synchronizing kubelet service object failed: %w", err)
return fmt.Errorf("failed to list endpointslice: %w", err)
}
c.logger.Debug("Updating Kubernetes endpoint")
err = k8sutil.CreateOrUpdateEndpoints(ctx, c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
if err != nil {
return fmt.Errorf("synchronizing kubelet endpoints object failed: %w", err)
epsl := []discoveryv1.EndpointSlice{}
if len(l.Items) > 0 {
epsl = l.Items
}
nodeAddressIdx := make(map[string]nodeAddress, len(addresses))
for _, a := range addresses {
nodeAddressIdx[a.ipAddress] = a
}
// Iterate over the existing endpoints to update their state or remove them
// if the IP address isn't associated to a node anymore.
for i, eps := range epsl {
endpoints := make([]discoveryv1.Endpoint, 0, len(eps.Endpoints))
for _, ep := range eps.Endpoints {
if len(ep.Addresses) != 1 {
c.logger.Warn("Got more than 1 address for the endpoint", "name", eps.Name, "num", len(ep.Addresses))
continue
}
a, found := nodeAddressIdx[ep.Addresses[0]]
if !found {
// The node doesn't exist anymore.
continue
}
endpoints = append(endpoints, a.discoveryV1Endpoint())
delete(nodeAddressIdx, a.ipAddress)
}
epsl[i].Endpoints = endpoints
}
// Append new nodes into the existing endpointslices.
for _, a := range addresses {
if _, found := nodeAddressIdx[a.ipAddress]; !found {
// Already processed.
continue
}
for i := range epsl {
if a.ipv4 != (epsl[i].AddressType == discoveryv1.AddressTypeIPv4) {
// Not the same address type.
continue
}
if len(epsl[i].Endpoints) >= c.maxEndpointsPerSlice {
// The endpoints slice is full.
continue
}
epsl[i].Endpoints = append(epsl[i].Endpoints, a.discoveryV1Endpoint())
delete(nodeAddressIdx, a.ipAddress)
break
}
}
// Create new endpointslice object(s) for the new nodes which couldn't be
// appended to the existing endpointslices.
var (
ipv4Eps *discoveryv1.EndpointSlice
ipv6Eps *discoveryv1.EndpointSlice
)
for _, a := range addresses {
if _, found := nodeAddressIdx[a.ipAddress]; !found {
// Already processed.
continue
}
if ipv4Eps != nil && c.fullCapacity(ipv4Eps.Endpoints) {
epsl = append(epsl, *ipv4Eps)
ipv4Eps = nil
}
if ipv6Eps != nil && c.fullCapacity(ipv6Eps.Endpoints) {
epsl = append(epsl, *ipv6Eps)
ipv6Eps = nil
}
eps := ipv4Eps
if !a.ipv4 {
eps = ipv6Eps
}
if eps == nil {
eps = &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
GenerateName: c.kubeletObjectName + "-",
Annotations: c.annotations,
Labels: c.labels.Merge(map[string]string{
discoveryv1.LabelServiceName: c.kubeletObjectName,
discoveryv1.LabelManagedBy: "prometheus-operator",
"k8s-app": "kubelet",
"app.kubernetes.io/name": "kubelet",
"app.kubernetes.io/managed-by": "prometheus-operator",
}),
OwnerReferences: []metav1.OwnerReference{{
APIVersion: "v1",
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
Kind: "Service",
Name: c.kubeletObjectName,
UID: svc.UID,
},
},
},
Ports: []discoveryv1.EndpointPort{
{
Name: ptr.To(httpsPortName),
Port: ptr.To(httpsPort),
},
{
Name: ptr.To(httpPortName),
Port: ptr.To(httpPort),
},
{
Name: ptr.To(cAdvisorPortName),
Port: ptr.To(cAdvisorPort),
},
},
}
if a.ipv4 {
eps.AddressType = discoveryv1.AddressTypeIPv4
ipv4Eps = eps
} else {
eps.AddressType = discoveryv1.AddressTypeIPv6
ipv6Eps = eps
}
}
eps.Endpoints = append(eps.Endpoints, a.discoveryV1Endpoint())
delete(nodeAddressIdx, a.ipAddress)
}
if ipv4Eps != nil {
epsl = append(epsl, *ipv4Eps)
}
if ipv6Eps != nil {
epsl = append(epsl, *ipv6Eps)
}
for _, eps := range epsl {
if len(eps.Endpoints) == 0 {
fmt.Println("delete")
c.logger.Debug("Deleting endpointslice object", "name", eps.Name)
err := client.Delete(ctx, eps.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete endpoinslice: %w", err)
}
continue
}
c.logger.Debug("Updating endpointslice object", "name", eps.Name)
err := k8sutil.CreateOrUpdateEndpointSlice(ctx, client, &eps)
if err != nil {
return fmt.Errorf("failed to update endpoinslice: %w", err)
}
}
return nil
}
func (c *Controller) fullCapacity(eps []discoveryv1.Endpoint) bool {
return len(eps) >= c.maxEndpointsPerSlice
}

View file

@ -15,40 +15,52 @@
package kubelet
import (
"context"
"fmt"
"log/slog"
"slices"
"strings"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/kubernetes/fake"
clientdiscoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
ktesting "k8s.io/client-go/testing"
logging "github.com/prometheus-operator/prometheus-operator/internal/log"
)
func TestGetNodeAddresses(t *testing.T) {
for _, c := range []struct {
name string
nodes *v1.NodeList
nodes []v1.Node
expectedAddresses []string
expectedErrors int
}{
{
name: "simple",
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
nodes: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
@ -60,43 +72,41 @@ func TestGetNodeAddresses(t *testing.T) {
{
// Replicates #1815
name: "missing ip on one node",
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "node-0",
Type: v1.NodeHostName,
},
nodes: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "node-0",
Type: v1.NodeHostName,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
@ -107,62 +117,60 @@ func TestGetNodeAddresses(t *testing.T) {
},
{
name: "not ready node unique ip",
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
nodes: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.2",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.2",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.3",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.3",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
@ -173,62 +181,60 @@ func TestGetNodeAddresses(t *testing.T) {
},
{
name: "not ready node duplicate ip",
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
nodes: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.1",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.3",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "10.0.0.3",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
@ -241,6 +247,7 @@ func TestGetNodeAddresses(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
controller := Controller{
nodeAddressPriority: "internal",
logger: newLogger(),
}
addrs, errs := controller.getNodeAddresses(c.nodes)
@ -251,51 +258,49 @@ func TestGetNodeAddresses(t *testing.T) {
}
func TestNodeAddressPriority(t *testing.T) {
nodes := &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "192.168.0.100",
Type: v1.NodeInternalIP,
},
{
Address: "203.0.113.100",
Type: v1.NodeExternalIP,
},
nodes := []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "192.168.0.100",
Type: v1.NodeInternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
{
Address: "203.0.113.100",
Type: v1.NodeExternalIP,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "104.27.131.189",
Type: v1.NodeExternalIP,
},
{
Address: "192.168.1.100",
Type: v1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: "104.27.131.189",
Type: v1.NodeExternalIP,
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
{
Address: "192.168.1.100",
Type: v1.NodeInternalIP,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
@ -304,6 +309,7 @@ func TestNodeAddressPriority(t *testing.T) {
internalC := Controller{
nodeAddressPriority: "internal",
logger: newLogger(),
}
actualAddresses, errs := internalC.getNodeAddresses(nodes)
require.Empty(t, errs)
@ -312,6 +318,7 @@ func TestNodeAddressPriority(t *testing.T) {
externalC := Controller{
nodeAddressPriority: "external",
logger: newLogger(),
}
actualAddresses, errs = externalC.getNodeAddresses(nodes)
require.Empty(t, errs)
@ -319,11 +326,250 @@ func TestNodeAddressPriority(t *testing.T) {
checkNodeAddresses(t, actualAddresses, expectedAddresses)
}
func checkNodeAddresses(t *testing.T, actualAddresses []v1.EndpointAddress, expectedAddresses []string) {
ips := make([]string, 0)
func checkNodeAddresses(t *testing.T, actualAddresses []nodeAddress, expectedAddresses []string) {
ips := make([]string, 0, len(actualAddresses))
for _, addr := range actualAddresses {
ips = append(ips, addr.IP)
ips = append(ips, addr.ipAddress)
}
require.Equal(t, expectedAddresses, ips)
}
func TestSync(t *testing.T) {
var (
ctx = context.Background()
id = int32(0)
fakeClient = fake.NewClientset()
)
fakeClient.PrependReactor(
"create", "*",
func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
ret = action.(ktesting.CreateAction).GetObject()
meta, ok := ret.(metav1.Object)
if !ok {
return
}
if meta.GetName() == "" && meta.GetGenerateName() != "" {
meta.SetName(names.SimpleNameGenerator.GenerateName(meta.GetGenerateName()))
meta.SetUID(types.UID(string('A' + id)))
id++
}
return
},
)
c, err := New(
newLogger(),
fakeClient,
nil,
"kubelet",
"test",
"",
nil,
nil,
WithEndpoints(), WithEndpointSlice(), WithMaxEndpointsPerSlice(2), WithNodeAddressPriority("internal"),
)
require.NoError(t, err)
var (
nclient = c.kclient.CoreV1().Nodes()
sclient = c.kclient.CoreV1().Services(c.kubeletObjectNamespace)
eclient = c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace)
esclient = c.kclient.DiscoveryV1().EndpointSlices(c.kubeletObjectNamespace)
)
t.Run("no nodes", func(t *testing.T) {
c.sync(ctx)
svc, err := sclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, svc)
ep, err := eclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, ep.Subsets, 1)
require.Empty(t, ep.Subsets[0].Addresses)
_ = listEndpointSlices(t, esclient, 0)
})
t.Run("add 1 ipv4 node", func(t *testing.T) {
_, _ = nclient.Create(ctx, newNode("node-0", "10.0.0.1"), metav1.CreateOptions{})
c.sync(ctx)
ep, err := eclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, ep.Subsets, 1)
require.Len(t, ep.Subsets[0].Addresses, 1)
require.Equal(t, "10.0.0.1", ep.Subsets[0].Addresses[0].IP)
eps := listEndpointSlices(t, esclient, 1)
require.Equal(t, discoveryv1.AddressType("IPv4"), eps[0].AddressType)
require.Len(t, eps[0].Endpoints, 1)
require.Len(t, eps[0].Endpoints[0].Addresses, 1)
require.Equal(t, "10.0.0.1", eps[0].Endpoints[0].Addresses[0])
})
t.Run("add 4 IPv4 nodes and 1 IPv6 node", func(t *testing.T) {
for _, n := range [][2]string{
{"node-1", "fc00:f853:ccd:e793::1"},
{"node-2", "10.0.0.2"},
{"node-3", "10.0.0.3"},
{"node-4", "10.0.0.4"},
{"node-5", "10.0.0.5"},
} {
_, _ = nclient.Create(ctx, newNode(n[0], n[1]), metav1.CreateOptions{})
}
c.sync(ctx)
ep, err := eclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, ep.Subsets[0].Addresses, 6)
for i, a := range []string{
"10.0.0.1",
"fc00:f853:ccd:e793::1",
"10.0.0.2",
"10.0.0.3",
"10.0.0.4",
"10.0.0.5",
} {
require.Equal(t, a, ep.Subsets[0].Addresses[i].IP)
}
eps := listEndpointSlices(t, esclient, 4)
i := 0
for _, ep := range eps {
if ep.AddressType == discoveryv1.AddressType("IPv6") {
require.Len(t, ep.Endpoints, 1)
require.Len(t, ep.Endpoints[0].Addresses, 1)
require.Equal(t, "fc00:f853:ccd:e793::1", ep.Endpoints[0].Addresses[0])
continue
}
switch i {
case 0:
require.Len(t, ep.Endpoints, 2)
require.Equal(t, "10.0.0.1", ep.Endpoints[0].Addresses[0])
require.Equal(t, "10.0.0.2", ep.Endpoints[1].Addresses[0])
case 1:
require.Len(t, ep.Endpoints, 2)
require.Equal(t, "10.0.0.3", ep.Endpoints[0].Addresses[0])
require.Equal(t, "10.0.0.4", ep.Endpoints[1].Addresses[0])
case 2:
require.Len(t, ep.Endpoints, 1)
require.Equal(t, "10.0.0.5", ep.Endpoints[0].Addresses[0])
}
i++
}
})
t.Run("delete 1 IPv4 node and 1 IPv6 node", func(t *testing.T) {
for _, n := range []string{"node-1", "node-3"} {
_ = nclient.Delete(ctx, n, metav1.DeleteOptions{})
}
c.sync(ctx)
ep, err := eclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, ep.Subsets[0].Addresses, 4)
for i, a := range []string{
"10.0.0.1",
"10.0.0.2",
"10.0.0.4",
"10.0.0.5",
} {
require.Equal(t, a, ep.Subsets[0].Addresses[i].IP)
}
eps := listEndpointSlices(t, esclient, 3)
for i, ep := range eps {
require.Equal(t, discoveryv1.AddressType("IPv4"), ep.AddressType)
switch i {
case 0:
require.Len(t, ep.Endpoints, 2)
require.Equal(t, "10.0.0.1", ep.Endpoints[0].Addresses[0])
require.Equal(t, "10.0.0.2", ep.Endpoints[1].Addresses[0])
case 1:
require.Len(t, ep.Endpoints, 1)
require.Equal(t, "10.0.0.4", ep.Endpoints[0].Addresses[0])
case 2:
require.Len(t, ep.Endpoints, 1)
require.Equal(t, "10.0.0.5", ep.Endpoints[0].Addresses[0])
}
}
})
t.Run("delete all nodes", func(t *testing.T) {
for _, n := range []string{"node-0", "node-2", "node-4", "node-5"} {
_ = nclient.Delete(ctx, n, metav1.DeleteOptions{})
}
c.sync(ctx)
ep, err := eclient.Get(ctx, c.kubeletObjectName, metav1.GetOptions{})
require.NoError(t, err)
require.Empty(t, ep.Subsets[0].Addresses)
_ = listEndpointSlices(t, esclient, 0)
})
}
func newNode(name, address string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name + "-" + address),
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Address: address,
Type: v1.NodeInternalIP,
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}
}
func listEndpointSlices(t *testing.T, c clientdiscoveryv1.EndpointSliceInterface, expected int) []discoveryv1.EndpointSlice {
t.Helper()
eps, err := c.List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, eps.Items, expected)
slices.SortStableFunc(eps.Items, func(a, b discoveryv1.EndpointSlice) int {
return strings.Compare(string(a.UID), string(b.UID))
})
return eps.Items
}
func newLogger() *slog.Logger {
l, err := logging.NewLoggerSlog(logging.Config{
Level: logging.LevelWarn,
Format: logging.FormatLogFmt,
})
if err != nil {
panic(fmt.Sprintf("failed to create logger: %v", err))
}
return l
}

View file

@ -716,7 +716,7 @@ func (c *Operator) syncStatefulSet(ctx context.Context, key string, p *monitorin
// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(p.Namespace)
if err := k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(p, c.config)); err != nil {
if _, err := k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(p, c.config)); err != nil {
return fmt.Errorf("synchronizing governing service failed: %w", err)
}

View file

@ -785,7 +785,7 @@ func (c *Operator) sync(ctx context.Context, key string) error {
// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(p.Namespace)
if err := k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(p, c.config)); err != nil {
if _, err := k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(p, c.config)); err != nil {
return fmt.Errorf("synchronizing governing service failed: %w", err)
}

View file

@ -478,7 +478,7 @@ func (o *Operator) sync(ctx context.Context, key string) error {
// Create governing service if it doesn't exist.
svcClient := o.kclient.CoreV1().Services(tr.Namespace)
if err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(tr, o.config)); err != nil {
if _, err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(tr, o.config)); err != nil {
return fmt.Errorf("synchronizing governing service failed: %w", err)
}