1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 19:49:46 +00:00

Merge pull request from s-urbaniak/remove-mlw

remove multilistwatcher and denylistfilter
This commit is contained in:
Sergiusz Urbaniak 2020-09-08 07:34:39 +02:00 committed by GitHub
commit 289ee029ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1086 additions and 652 deletions

View file

@ -23,8 +23,8 @@ import (
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
prometheusoperator "github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
@ -38,9 +38,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -57,8 +55,8 @@ type Operator struct {
mclient monitoringclient.Interface
logger log.Logger
alrtInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
alrtInfs *informers.ForResource
ssetInfs *informers.ForResource
queue workqueue.RateLimitingInterface
@ -117,34 +115,35 @@ func New(ctx context.Context, c prometheusoperator.Config, logger log.Logger, r
},
}
o.alrtInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(ctx, options)
},
}
}),
o.alrtInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.AlertmanagerAllowList,
o.config.Namespaces.DenyList,
mclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = o.config.AlertManagerSelector
},
),
&monitoringv1.Alertmanager{}, resyncPeriod, cache.Indexers{},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.AlertmanagerName),
)
o.metrics.MustRegister(NewAlertmanagerCollector(o.alrtInf.GetStore()))
o.metrics.MustRegister(operator.NewStoreCollector("alertmanager", o.alrtInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating alertmanager informers")
}
o.ssetInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(o.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
}),
o.ssetInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.AlertmanagerAllowList,
o.config.Namespaces.DenyList,
o.kclient,
resyncPeriod,
nil,
),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{},
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
)
if err != nil {
return nil, errors.Wrap(err, "error creating statefulset informers")
}
return o, nil
}
@ -152,21 +151,24 @@ func New(ctx context.Context, c prometheusoperator.Config, logger log.Logger, r
// waitForCacheSync waits for the informers' caches to be synced.
func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
ok := true
informers := []struct {
name string
informer cache.SharedIndexInformer
for _, infs := range []struct {
name string
informersForResource *informers.ForResource
}{
{"Alertmanager", c.alrtInf},
{"StatefulSet", c.ssetInf},
}
for _, inf := range informers {
if !cache.WaitForCacheSync(stopc, inf.informer.HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", inf.name))
ok = false
} else {
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", inf.name))
{"Alertmanager", c.alrtInfs},
{"StatefulSet", c.ssetInfs},
} {
for _, inf := range infs.informersForResource.GetInformers() {
if !cache.WaitForCacheSync(stopc, inf.Informer().HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
ok = false
} else {
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
}
}
}
if !ok {
return errors.New("failed to sync caches")
}
@ -176,12 +178,12 @@ func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
c.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.alrtInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAlertmanagerAdd,
DeleteFunc: c.handleAlertmanagerDelete,
UpdateFunc: c.handleAlertmanagerUpdate,
})
c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleStatefulSetAdd,
DeleteFunc: c.handleStatefulSetDelete,
UpdateFunc: c.handleStatefulSetUpdate,
@ -215,8 +217,8 @@ func (c *Operator) Run(ctx context.Context) error {
go c.worker(ctx)
go c.alrtInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
go c.alrtInfs.Start(ctx.Done())
go c.ssetInfs.Start(ctx.Done())
if err := c.waitForCacheSync(ctx.Done()); err != nil {
return err
}
@ -303,14 +305,16 @@ func (c *Operator) alertmanagerForStatefulSet(sset interface{}) *monitoringv1.Al
}
aKey := statefulSetKeyToAlertmanagerKey(key)
a, exists, err := c.alrtInf.GetStore().GetByKey(aKey)
a, err := c.alrtInfs.Get(aKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
level.Error(c.logger).Log("msg", "Alertmanager lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return a.(*monitoringv1.Alertmanager)
}
@ -398,16 +402,17 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.alrtInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
aobj, err := c.alrtInfs.Get(key)
if apierrors.IsNotFound(err) {
// Dependent resources are cleaned up by K8s via OwnerReferences
return nil
}
if err != nil {
return err
}
am := obj.(*monitoringv1.Alertmanager)
am := aobj.(*monitoringv1.Alertmanager)
am = am.DeepCopy()
am.APIVersion = monitoringv1.SchemeGroupVersion.String()
am.Kind = monitoringv1.AlertmanagersKind
@ -426,12 +431,13 @@ func (c *Operator) sync(ctx context.Context, key string) error {
ssetClient := c.kclient.AppsV1().StatefulSets(am.Namespace)
// Ensure we have a StatefulSet running Alertmanager deployed.
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(alertmanagerKeyToStatefulSetKey(key))
if err != nil {
obj, err := c.ssetInfs.Get(alertmanagerKeyToStatefulSetKey(key))
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving statefulset failed")
}
if !exists {
if apierrors.IsNotFound(err) {
sset, err := makeStatefulSet(am, nil, c.config)
if err != nil {
return errors.Wrap(err, "making the statefulset, to create, failed")

179
pkg/informers/informers.go Normal file
View file

@ -0,0 +1,179 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"github.com/pkg/errors"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
)
// InformLister is the interface that both exposes a shared index informer
// and a generic lister.
// Usually generated clients declare this interface as "GenericInformer".
type InformLister interface {
Informer() cache.SharedIndexInformer
Lister() cache.GenericLister
}
// FactoriesForNamespaces is a way to combine several shared informers into a single struct with unified listing power.
type FactoriesForNamespaces interface {
ForResource(namespace string, resource schema.GroupVersionResource) (InformLister, error)
Namespaces() sets.String
}
// ForResource contains a slice of InformLister for a concrete resource type,
// one per namespace.
type ForResource struct {
informers []InformLister
}
// NewInformersForResource returns a composite informer exposing the most basic set of operations
// needed from informers and listers. It does not implement a formal interface,
// but exposes a minimal set of methods from underlying slice of cache.SharedIndexInformers and cache.GenericListers.
//
// It takes a namespace aware informer factory, wrapped in a FactoriesForNamespaces interface
// that is able to instantiate an informer for a given namespace.
func NewInformersForResource(ifs FactoriesForNamespaces, resource schema.GroupVersionResource) (*ForResource, error) {
namespaces := ifs.Namespaces().List()
var informers []InformLister
for _, ns := range namespaces {
informer, err := ifs.ForResource(ns, resource)
if err != nil {
return nil, errors.Wrapf(err, "error getting informer in namespace %q for resource %v", ns, resource)
}
informers = append(informers, informer)
}
return &ForResource{
informers: informers,
}, nil
}
// Start starts all underlying informers, passing the given stop channel to each of them.
func (w *ForResource) Start(stopCh <-chan struct{}) {
for _, i := range w.informers {
go i.Informer().Run(stopCh)
}
}
// GetInformers returns all wrapped informers.
func (w *ForResource) GetInformers() []InformLister {
return w.informers
}
// AddEventHandler registers the given handler to all wrapped informers.
func (w *ForResource) AddEventHandler(handler cache.ResourceEventHandler) {
for _, i := range w.informers {
i.Informer().AddEventHandler(handler)
}
}
// HasSynced returns true if all underlying informers have synced, else false.
func (w *ForResource) HasSynced() bool {
for _, i := range w.informers {
if !i.Informer().HasSynced() {
return false
}
}
return true
}
// List lists based on the requested selector.
// It invokes all wrapped informers, the result is concatenated.
func (w *ForResource) List(selector labels.Selector) ([]runtime.Object, error) {
var ret []runtime.Object
for _, inf := range w.informers {
objs, err := inf.Lister().List(selector)
if err != nil {
return nil, err
}
ret = append(ret, objs...)
}
return ret, nil
}
// ListAllByNamespace invokes all wrapped informers passing the same appendFn.
// While wrapped informers are usually namespace aware, it is still important to iterate over all of them
// as some informers might wrap k8s.io/apimachinery/pkg/apis/meta/v1.NamespaceAll.
func (w *ForResource) ListAllByNamespace(namespace string, selector labels.Selector, appendFn cache.AppendFunc) error {
for _, inf := range w.informers {
err := cache.ListAllByNamespace(inf.Informer().GetIndexer(), namespace, selector, appendFn)
if err != nil {
return err
}
}
return nil
}
// Get invokes all wrapped informers and returns the first found runtime object.
// It returns the first ocured error.
func (w *ForResource) Get(name string) (runtime.Object, error) {
var err error
for _, inf := range w.informers {
var ret runtime.Object
ret, err = inf.Lister().Get(name)
if apierrors.IsNotFound(err) {
continue
}
if err != nil {
return nil, err
}
return ret, nil
}
return nil, err
}
// newInformerOptions returns a list option tweak function and a list of namespaces
// based on the given allowed and denied namespaces.
//
// If allowedNamespaces contains one only entry equal to k8s.io/apimachinery/pkg/apis/meta/v1.NamespaceAll
// then it returns it and a tweak function filtering denied namespaces using a field selector.
//
// Else, denied namespaces are ignored and just the set of allowed namespaces is returned.
func newInformerOptions(allowedNamespaces, deniedNamespaces map[string]struct{}, tweaks func(*v1.ListOptions)) (func(*v1.ListOptions), []string) {
if tweaks == nil {
tweaks = func(*v1.ListOptions) {} // nop
}
var namespaces []string
if listwatch.IsAllNamespaces(allowedNamespaces) {
return func(options *v1.ListOptions) {
tweaks(options)
listwatch.DenyTweak(options, "metadata.namespace", deniedNamespaces)
}, []string{v1.NamespaceAll}
}
for ns := range allowedNamespaces {
namespaces = append(namespaces, ns)
}
return tweaks, namespaces
}

View file

@ -0,0 +1,247 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"reflect"
"sort"
"strings"
"testing"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
)
type mockFactory struct {
namespaces sets.String
objects map[string]runtime.Object
}
func (m *mockFactory) List(selector labels.Selector) (ret []runtime.Object, err error) {
panic("implement me")
}
func (m *mockFactory) Get(name string) (runtime.Object, error) {
if obj, ok := m.objects[name]; ok {
return obj, nil
}
return nil, errors.NewNotFound(schema.GroupResource{}, name)
}
func (m *mockFactory) ByNamespace(namespace string) cache.GenericNamespaceLister {
panic("not implemented")
}
func (m *mockFactory) Informer() cache.SharedIndexInformer {
panic("not implemented")
}
func (m *mockFactory) Lister() cache.GenericLister {
return m
}
func (m *mockFactory) ForResource(namespace string, resource schema.GroupVersionResource) (InformLister, error) {
return m, nil
}
func (m *mockFactory) Namespaces() sets.String {
return m.namespaces
}
func TestInformers(t *testing.T) {
t.Run("TestGet", func(t *testing.T) {
ifs, err := NewInformersForResource(
&mockFactory{
namespaces: sets.NewString("foo", "bar"),
objects: map[string]runtime.Object{
"foo": &monitoringv1.Prometheus{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
},
},
},
schema.GroupVersionResource{},
)
if err != nil {
t.Error(err)
return
}
_, err = ifs.Get("foo")
if err != nil {
t.Error(err)
return
}
_, err = ifs.Get("bar")
if !errors.IsNotFound(err) {
t.Errorf("expected IsNotFound error, got %v", err)
return
}
})
}
func TestNewInformerOptions(t *testing.T) {
for _, tc := range []struct {
name string
allowedNamespaces, deniedNamespaces map[string]struct{}
tweaks func(*v1.ListOptions)
expectedOptions v1.ListOptions
expectedNamespaces []string
}{
{
name: "all unset",
expectedOptions: v1.ListOptions{},
expectedNamespaces: nil,
},
{
name: "allowed namespaces",
allowedNamespaces: map[string]struct{}{
"foo": {},
"bar": {},
},
expectedOptions: v1.ListOptions{},
expectedNamespaces: []string{
"foo",
"bar",
},
},
{
name: "allowed namespaces with a tweak",
allowedNamespaces: map[string]struct{}{
"foo": {},
"bar": {},
},
tweaks: func(options *v1.ListOptions) {
options.FieldSelector = "metadata.name=foo"
},
expectedOptions: v1.ListOptions{
FieldSelector: "metadata.name=foo",
},
expectedNamespaces: []string{
"foo",
"bar",
},
},
{
name: "allowed and ignored denied namespaces",
allowedNamespaces: map[string]struct{}{
"foo": {},
"bar": {},
},
deniedNamespaces: map[string]struct{}{
"denied1": {},
"denied2": {},
},
expectedOptions: v1.ListOptions{},
expectedNamespaces: []string{
"foo",
"bar",
},
},
{
name: "one allowed namespace and ignored denied namespaces",
allowedNamespaces: map[string]struct{}{
"foo": {},
},
deniedNamespaces: map[string]struct{}{
"denied1": {},
"denied2": {},
},
expectedOptions: v1.ListOptions{},
expectedNamespaces: []string{
"foo",
},
},
{
name: "all allowed namespaces denying namespaces",
allowedNamespaces: map[string]struct{}{
v1.NamespaceAll: {},
},
deniedNamespaces: map[string]struct{}{
"denied2": {},
"denied1": {},
},
expectedNamespaces: []string{
v1.NamespaceAll,
},
expectedOptions: v1.ListOptions{
FieldSelector: "metadata.namespace!=denied1,metadata.namespace!=denied2",
},
},
{
name: "denied namespaces with tweak",
allowedNamespaces: map[string]struct{}{
v1.NamespaceAll: {},
},
deniedNamespaces: map[string]struct{}{
"denied2": {},
"denied1": {},
},
tweaks: func(options *v1.ListOptions) {
options.FieldSelector = "metadata.name=foo"
},
expectedNamespaces: []string{
v1.NamespaceAll,
},
expectedOptions: v1.ListOptions{
FieldSelector: "metadata.name=foo,metadata.namespace!=denied1,metadata.namespace!=denied2",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
tweaks, namespaces := newInformerOptions(tc.allowedNamespaces, tc.deniedNamespaces, tc.tweaks)
opts := v1.ListOptions{}
tweaks(&opts)
// sort the field selector as entries are in non-deterministic order
sortFieldSelector := func(opts *v1.ListOptions) {
fs := strings.Split(opts.FieldSelector, ",")
sort.Strings(fs)
opts.FieldSelector = strings.Join(fs, ",")
}
sortFieldSelector(&opts)
sortFieldSelector(&tc.expectedOptions)
if !reflect.DeepEqual(tc.expectedOptions, opts) {
t.Errorf("expected list options %v, got %v", tc.expectedOptions, opts)
}
// sort namespaces as entries are in non-deterministic order
sort.Strings(namespaces)
sort.Strings(tc.expectedNamespaces)
if !reflect.DeepEqual(tc.expectedNamespaces, namespaces) {
t.Errorf("expected namespaces %v, got %v", tc.expectedNamespaces, namespaces)
}
})
}
}

59
pkg/informers/kube.go Normal file
View file

@ -0,0 +1,59 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
// NewKubeInformerFactories creates factories for kube resources
// for the given allowed, and denied namespaces these parameters being mutually exclusive.
// kubeClient, defaultResync, and tweakListOptions are being passed to the underlying informer factory.
func NewKubeInformerFactories(
allowNamespaces, denyNamespaces map[string]struct{},
kubeClient kubernetes.Interface,
defaultResync time.Duration,
tweakListOptions func(*metav1.ListOptions),
) FactoriesForNamespaces {
tweaks, namespaces := newInformerOptions(
allowNamespaces, denyNamespaces, tweakListOptions,
)
opts := []informers.SharedInformerOption{informers.WithTweakListOptions(tweaks)}
ret := kubeInformersForNamespaces{}
for _, namespace := range namespaces {
opts = append(opts, informers.WithNamespace(namespace))
ret[namespace] = informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResync, opts...)
}
return ret
}
type kubeInformersForNamespaces map[string]informers.SharedInformerFactory
func (i kubeInformersForNamespaces) Namespaces() sets.String {
return sets.StringKeySet(i)
}
func (i kubeInformersForNamespaces) ForResource(namespace string, resource schema.GroupVersionResource) (InformLister, error) {
return i[namespace].ForResource(resource)
}

View file

@ -0,0 +1,59 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"time"
informers "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
)
// NewKubeInformerFactories creates factories for monitoring resources
// for the given allowed, and denied namespaces these parameters being mutually exclusive.
// monitoringClient, defaultResync, and tweakListOptions are being passed to the underlying informer factory.
func NewMonitoringInformerFactories(
allowNamespaces, denyNamespaces map[string]struct{},
monitoringClient monitoring.Interface,
defaultResync time.Duration,
tweakListOptions func(*metav1.ListOptions),
) FactoriesForNamespaces {
tweaks, namespaces := newInformerOptions(
allowNamespaces, denyNamespaces, tweakListOptions,
)
opts := []informers.SharedInformerOption{informers.WithTweakListOptions(tweaks)}
ret := monitoringInformersForNamespaces{}
for _, namespace := range namespaces {
opts = append(opts, informers.WithNamespace(namespace))
ret[namespace] = informers.NewSharedInformerFactoryWithOptions(monitoringClient, defaultResync, opts...)
}
return ret
}
type monitoringInformersForNamespaces map[string]informers.SharedInformerFactory
func (i monitoringInformersForNamespaces) Namespaces() sets.String {
return sets.StringKeySet(i)
}
func (i monitoringInformersForNamespaces) ForResource(namespace string, resource schema.GroupVersionResource) (InformLister, error) {
return i[namespace].ForResource(resource)
}

View file

@ -16,17 +16,13 @@ package listwatch
import (
"context"
"fmt"
"strings"
"sync"
"github.com/go-kit/kit/log"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
@ -66,15 +62,20 @@ func NewUnprivilegedNamespaceListWatchFromClient(l log.Logger, c cache.Getter, a
func NewFilteredUnprivilegedNamespaceListWatchFromClient(l log.Logger, c cache.Getter, allowedNamespaces, deniedNamespaces map[string]struct{}, optionsModifier func(options *metav1.ListOptions)) cache.ListerWatcher {
// If the only namespace given is `v1.NamespaceAll`, then this
// cache.ListWatch must be privileged. In this case, return a regular
// cache.ListWatch decorated with a denylist watcher
// cache.ListWatch tweaked with denylist fieldselector
// filtering the given denied namespaces.
if IsAllNamespaces(allowedNamespaces) {
return newDenylistListerWatcher(
l,
deniedNamespaces,
cache.NewFilteredListWatchFromClient(c, "namespaces", metav1.NamespaceAll, optionsModifier),
)
tweak := func(options *metav1.ListOptions) {
if optionsModifier != nil {
optionsModifier(options)
}
DenyTweak(options, "metadata.name", deniedNamespaces)
}
return cache.NewFilteredListWatchFromClient(c, "namespaces", metav1.NamespaceAll, tweak)
}
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
list := &v1.NamespaceList{}
@ -102,167 +103,6 @@ func NewFilteredUnprivilegedNamespaceListWatchFromClient(l log.Logger, c cache.G
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// MultiNamespaceListerWatcher takes allowed and denied namespaces and a
// cache.ListerWatcher generator func and returns a single cache.ListerWatcher
// capable of operating on multiple namespaces.
//
// Allowed namespaces and denied namespaces are mutually exclusive.
// If allowed namespaces contain multiple items, the given denied namespaces have no effect.
// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string),
// the given denied namespaces are applied.
func MultiNamespaceListerWatcher(l log.Logger, allowedNamespaces, deniedNamespaces map[string]struct{}, f func(string) cache.ListerWatcher) cache.ListerWatcher {
// If there is only one namespace then there is no need to create a
// multi lister watcher proxy.
if IsAllNamespaces(allowedNamespaces) {
return newDenylistListerWatcher(l, deniedNamespaces, f(v1.NamespaceAll))
}
if len(allowedNamespaces) == 1 {
for n := range allowedNamespaces {
return f(n)
}
}
var lws []cache.ListerWatcher
for n := range allowedNamespaces {
lws = append(lws, f(n))
}
return multiListerWatcher(lws)
}
// multiListerWatcher abstracts several cache.ListerWatchers, allowing them
// to be treated as a single cache.ListerWatcher.
type multiListerWatcher []cache.ListerWatcher
// List implements the ListerWatcher interface.
// It combines the output of the List method of every ListerWatcher into
// a single result.
func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
l := metav1.List{}
resourceVersions := sets.NewString()
for _, lw := range mlw {
list, err := lw.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
for _, item := range items {
l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()})
}
if !resourceVersions.Has(metaObj.GetResourceVersion()) {
resourceVersions.Insert(metaObj.GetResourceVersion())
}
}
// Combine the resource versions so that the composite Watch method can
// distribute appropriate versions to each underlying Watch func.
l.ListMeta.ResourceVersion = strings.Join(resourceVersions.List(), "/")
return &l, nil
}
// Watch implements the ListerWatcher interface.
// It returns a watch.Interface that combines the output from the
// watch.Interface of every cache.ListerWatcher into a single result chan.
func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
var resourceVersions string
// Allow resource versions to be "".
if options.ResourceVersion != "" {
rvs := strings.Split(options.ResourceVersion, "/")
if len(rvs) > 1 {
return nil, fmt.Errorf("expected resource version to have 1 part, got %d", len(rvs))
}
resourceVersions = options.ResourceVersion
}
return newMultiWatch(mlw, resourceVersions, options)
}
// multiWatch abstracts multiple watch.Interface's, allowing them
// to be treated as a single watch.Interface.
type multiWatch struct {
result chan watch.Event
stopped chan struct{}
stoppers []func()
}
// newMultiWatch returns a new multiWatch or an error if one of the underlying
// Watch funcs errored.
func newMultiWatch(lws []cache.ListerWatcher, resourceVersions string, options metav1.ListOptions) (*multiWatch, error) {
var (
result = make(chan watch.Event)
stopped = make(chan struct{})
stoppers []func()
wg sync.WaitGroup
)
wg.Add(len(lws))
for _, lw := range lws {
o := options.DeepCopy()
o.ResourceVersion = resourceVersions
w, err := lw.Watch(*o)
if err != nil {
return nil, err
}
go func() {
defer wg.Done()
for {
event, ok := <-w.ResultChan()
if !ok {
return
}
select {
case result <- event:
case <-stopped:
return
}
}
}()
stoppers = append(stoppers, w.Stop)
}
// result chan must be closed,
// once all event sender goroutines exited.
go func() {
wg.Wait()
close(result)
}()
return &multiWatch{
result: result,
stoppers: stoppers,
stopped: stopped,
}, nil
}
// ResultChan implements the watch.Interface interface.
func (mw *multiWatch) ResultChan() <-chan watch.Event {
return mw.result
}
// Stop implements the watch.Interface interface.
// It stops all of the underlying watch.Interfaces and closes the backing chan.
// Can safely be called more than once.
func (mw *multiWatch) Stop() {
select {
case <-mw.stopped:
// nothing to do, we are already stopped
default:
for _, stop := range mw.stoppers {
stop()
}
close(mw.stopped)
}
return
}
// IsAllNamespaces checks if the given map of namespaces
// contains only v1.NamespaceAll.
func IsAllNamespaces(namespaces map[string]struct{}) bool {
@ -284,3 +124,23 @@ func IdenticalNamespaces(a, b map[string]struct{}) bool {
return true
}
// DenyTweak modifies the given list options
// by adding a field selector not matching the given values.
func DenyTweak(options *metav1.ListOptions, field string, valueSet map[string]struct{}) {
if len(valueSet) == 0 {
return
}
var selectors []string
for value := range valueSet {
selectors = append(selectors, field+"!="+value)
}
if options.FieldSelector != "" {
selectors = append(selectors, options.FieldSelector)
}
options.FieldSelector = strings.Join(selectors, ",")
}

View file

@ -15,163 +15,9 @@
package listwatch
import (
"sync"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
var _ watch.Interface = &multiWatch{}
func setupMultiWatch(n int, t *testing.T, rvs string) ([]*watch.FakeWatcher, *multiWatch) {
ws := make([]*watch.FakeWatcher, n)
lws := make([]cache.ListerWatcher, n)
for i := range ws {
w := watch.NewFake()
ws[i] = w
lws[i] = &cache.ListWatch{WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
return w, nil
}}
}
m, err := newMultiWatch(lws, rvs, metav1.ListOptions{})
if err != nil {
t.Fatalf("failed to create new multiWatch: %v", err)
}
return ws, m
}
func TestNewMultiWatch(t *testing.T) {
func() {
defer func() {
if r := recover(); r != nil {
t.Errorf("newMultiWatch should not panic when number of resource versions matches ListerWatchers; got: %v", r)
}
}()
// Create a multiWatch from 1 ListerWatchers and pass 1 resource versions.
_, _ = setupMultiWatch(1, t, "1")
}()
}
func TestMultiWatchResultChan(t *testing.T) {
ws, m := setupMultiWatch(10, t, "10")
defer m.Stop()
var events []watch.Event
var wg sync.WaitGroup
for _, w := range ws {
w := w
wg.Add(1)
go func() {
w.Add(&runtime.Unknown{})
}()
}
go func() {
for {
event, ok := <-m.ResultChan()
if !ok {
break
}
events = append(events, event)
wg.Done()
}
}()
wg.Wait()
if len(events) != len(ws) {
t.Errorf("expected %d events but got %d", len(ws), len(events))
}
}
func TestMultiWatchStop(t *testing.T) {
ws, m := setupMultiWatch(10, t, "10")
m.Stop()
var stopped int
for _, w := range ws {
_, running := <-w.ResultChan()
if !running && w.IsStopped() {
stopped++
}
}
if stopped != len(ws) {
t.Errorf("expected %d watchers to be stopped but got %d", len(ws), stopped)
}
select {
case <-m.stopped:
// all good, watcher is closed, proceed
default:
t.Error("expected multiWatch to be stopped")
}
_, running := <-m.ResultChan()
if running {
t.Errorf("expected multiWatch chan to be closed")
}
}
type mockListerWatcher struct {
listResult runtime.Object
evCh chan watch.Event
stopped bool
}
func (m *mockListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
return m.listResult, nil
}
func (m *mockListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return m, nil
}
func (m *mockListerWatcher) Stop() {
m.stopped = true
}
func (m *mockListerWatcher) ResultChan() <-chan watch.Event {
return m.evCh
}
func TestRacyMultiWatch(t *testing.T) {
evCh := make(chan watch.Event)
lw := &mockListerWatcher{evCh: evCh}
mw, err := newMultiWatch(
[]cache.ListerWatcher{lw},
"foo",
metav1.ListOptions{},
)
if err != nil {
t.Error(err)
return
}
// this will not block, as newMultiWatch started a goroutine,
// receiving that event and block on the dispatching it there.
evCh <- watch.Event{
Type: "foo",
}
if got := <-mw.ResultChan(); got.Type != "foo" {
t.Errorf("expected foo, got %s", got.Type)
return
}
// Enqueue event, do not dequeue it.
// In conjunction with go test -race this asserts
// if there is a race between stopping and dispatching an event
evCh <- watch.Event{
Type: "bar",
}
mw.Stop()
if got := lw.stopped; got != true {
t.Errorf("expected watcher to be closed true, got %t", got)
}
// some reentrant calls, should be non-blocking
mw.Stop()
mw.Stop()
}
func TestIdenticalNamespaces(t *testing.T) {
for _, tc := range []struct {
a, b map[string]struct{}

View file

@ -28,6 +28,28 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
type mockListerWatcher struct {
listResult runtime.Object
evCh chan watch.Event
stopped bool
}
func (m *mockListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
return m.listResult, nil
}
func (m *mockListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return m, nil
}
func (m *mockListerWatcher) Stop() {
m.stopped = true
}
func (m *mockListerWatcher) ResultChan() <-chan watch.Event {
return m.evCh
}
func newUnstructured(namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{

View file

@ -15,8 +15,7 @@
package prometheus
import (
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/tools/cache"
)
@ -33,11 +32,15 @@ var (
)
type prometheusCollector struct {
store cache.Store
stores []cache.Store
}
func NewPrometheusCollector(s cache.Store) *prometheusCollector {
return &prometheusCollector{store: s}
return &prometheusCollector{stores: []cache.Store{s}}
}
func NewPrometheusCollectorForStores(s ...cache.Store) *prometheusCollector {
return &prometheusCollector{stores: s}
}
// Describe implements the prometheus.Collector interface.
@ -47,8 +50,10 @@ func (c *prometheusCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect implements the prometheus.Collector interface.
func (c *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
for _, p := range c.store.List() {
c.collectPrometheus(ch, p.(*v1.Prometheus))
for _, s := range c.stores {
for _, p := range s.List() {
c.collectPrometheus(ch, p.(*v1.Prometheus))
}
}
}

View file

@ -25,6 +25,7 @@ import (
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
@ -41,9 +42,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -61,17 +60,18 @@ type Operator struct {
mclient monitoringclient.Interface
logger log.Logger
promInf cache.SharedIndexInformer
smonInf cache.SharedIndexInformer
pmonInf cache.SharedIndexInformer
probeInf cache.SharedIndexInformer
ruleInf cache.SharedIndexInformer
cmapInf cache.SharedIndexInformer
secrInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
nsPromInf cache.SharedIndexInformer
nsMonInf cache.SharedIndexInformer
promInfs *informers.ForResource
smonInfs *informers.ForResource
pmonInfs *informers.ForResource
probeInfs *informers.ForResource
ruleInfs *informers.ForResource
cmapInfs *informers.ForResource
secrInfs *informers.ForResource
ssetInfs *informers.ForResource
queue workqueue.RateLimitingInterface
metrics *operator.Metrics
@ -248,129 +248,129 @@ func New(ctx context.Context, conf Config, logger log.Logger, r prometheus.Regis
}
c.metrics.MustRegister(c.nodeAddressLookupErrors, c.nodeEndpointSyncs, c.nodeEndpointSyncErrors)
c.promInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).Watch(ctx, options)
},
}
}),
c.promInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.config.Namespaces.PrometheusAllowList,
c.config.Namespaces.DenyList,
mclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = c.config.PromSelector
},
),
&monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusName),
)
c.metrics.MustRegister(NewPrometheusCollector(c.promInf.GetStore()))
c.metrics.MustRegister(operator.NewStoreCollector("prometheus", c.promInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating prometheus informers")
}
c.smonInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().ServiceMonitors(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().ServiceMonitors(namespace).Watch(ctx, options)
},
}
}),
),
&monitoringv1.ServiceMonitor{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.metrics.MustRegister(operator.NewStoreCollector("servicemonitor", c.smonInf.GetStore()))
var promStores []cache.Store
for _, informer := range c.promInfs.GetInformers() {
promStores = append(promStores, informer.Informer().GetStore())
}
c.metrics.MustRegister(NewPrometheusCollectorForStores(promStores...))
c.pmonInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PodMonitors(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PodMonitors(namespace).Watch(ctx, options)
},
}
}),
c.smonInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.config.Namespaces.AllowList,
c.config.Namespaces.DenyList,
mclient,
resyncPeriod,
nil,
),
&monitoringv1.PodMonitor{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName),
)
c.metrics.MustRegister(operator.NewStoreCollector("podmonitor", c.pmonInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating servicemonitor informers")
}
c.probeInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) {
return mclient.MonitoringV1().Probes(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (w watch.Interface, err error) {
return mclient.MonitoringV1().Probes(namespace).Watch(ctx, options)
},
}
}),
c.pmonInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.config.Namespaces.AllowList,
c.config.Namespaces.DenyList,
mclient,
resyncPeriod,
nil,
),
&monitoringv1.Probe{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName),
)
c.metrics.MustRegister(operator.NewStoreCollector("probe", c.probeInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating podmonitor informers")
}
c.ruleInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
},
}
}),
c.probeInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.config.Namespaces.AllowList,
c.config.Namespaces.DenyList,
mclient,
resyncPeriod,
nil,
),
&monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName),
)
c.metrics.MustRegister(operator.NewStoreCollector("prometheurule", c.ruleInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating probe informers")
}
c.cmapInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelPrometheusName
return c.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelPrometheusName
return c.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
},
}
}),
c.ruleInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.config.Namespaces.AllowList,
c.config.Namespaces.DenyList,
mclient,
resyncPeriod,
nil,
),
&v1.ConfigMap{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusRuleName),
)
if err != nil {
return nil, errors.Wrap(err, "error creating prometheusrule informers")
}
c.secrInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(c.kclient.CoreV1().RESTClient(), "secrets", namespace, secretListWatchSelector)
}),
c.cmapInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
c.config.Namespaces.AllowList,
c.config.Namespaces.DenyList,
c.kclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = labelPrometheusName
},
),
&v1.Secret{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
v1.SchemeGroupVersion.WithResource(string(v1.ResourceConfigMaps)),
)
if err != nil {
return nil, errors.Wrap(err, "error creating configmap informers")
}
c.ssetInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(c.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
}),
c.secrInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
c.config.Namespaces.PrometheusAllowList,
c.config.Namespaces.DenyList,
c.kclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.FieldSelector = secretListWatchSelector.String()
},
),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
v1.SchemeGroupVersion.WithResource(string(v1.ResourceSecrets)),
)
if err != nil {
return nil, errors.Wrap(err, "error creating secrets informers")
}
c.ssetInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
c.config.Namespaces.PrometheusAllowList,
c.config.Namespaces.DenyList,
c.kclient,
resyncPeriod,
nil,
),
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
)
if err != nil {
return nil, errors.Wrap(err, "error creating statefulset informers")
}
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
// nsResyncPeriod is used to control how often the namespace informer
@ -406,18 +406,34 @@ func New(ctx context.Context, conf Config, logger log.Logger, r prometheus.Regis
// waitForCacheSync waits for the informers' caches to be synced.
func (c *Operator) waitForCacheSync(ctx context.Context) error {
ok := true
for _, infs := range []struct {
name string
informersForResource *informers.ForResource
}{
{"Prometheus", c.promInfs},
{"ServiceMonitor", c.smonInfs},
{"PodMonitor", c.pmonInfs},
{"PrometheusRule", c.ruleInfs},
{"Probe", c.probeInfs},
{"ConfigMap", c.cmapInfs},
{"Secret", c.secrInfs},
{"StatefulSet", c.ssetInfs},
} {
for _, inf := range infs.informersForResource.GetInformers() {
if !cache.WaitForCacheSync(ctx.Done(), inf.Informer().HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
ok = false
} else {
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
}
}
}
informers := []struct {
name string
informer cache.SharedIndexInformer
}{
{"Prometheus", c.promInf},
{"ServiceMonitor", c.smonInf},
{"PodMonitor", c.pmonInf},
{"Probe", c.probeInf},
{"PrometheusRule", c.ruleInf},
{"ConfigMap", c.cmapInf},
{"Secret", c.secrInf},
{"StatefulSet", c.ssetInf},
{"PromNamespace", c.nsPromInf},
{"MonNamespace", c.nsMonInf},
}
@ -438,42 +454,44 @@ func (c *Operator) waitForCacheSync(ctx context.Context) error {
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.promInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePrometheusAdd,
DeleteFunc: c.handlePrometheusDelete,
UpdateFunc: c.handlePrometheusUpdate,
})
c.smonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.smonInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleSmonAdd,
DeleteFunc: c.handleSmonDelete,
UpdateFunc: c.handleSmonUpdate,
})
c.pmonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.pmonInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePmonAdd,
DeleteFunc: c.handlePmonDelete,
UpdateFunc: c.handlePmonUpdate,
})
c.probeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.probeInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleBmonAdd,
UpdateFunc: c.handleBmonUpdate,
DeleteFunc: c.handleBmonDelete,
})
c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.ruleInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleRuleAdd,
DeleteFunc: c.handleRuleDelete,
UpdateFunc: c.handleRuleUpdate,
})
c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.cmapInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleConfigMapAdd,
DeleteFunc: c.handleConfigMapDelete,
UpdateFunc: c.handleConfigMapUpdate,
})
c.secrInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.secrInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleSecretAdd,
DeleteFunc: c.handleSecretDelete,
UpdateFunc: c.handleSecretUpdate,
})
c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleStatefulSetAdd,
DeleteFunc: c.handleStatefulSetDelete,
UpdateFunc: c.handleStatefulSetUpdate,
@ -507,14 +525,14 @@ func (c *Operator) Run(ctx context.Context) error {
go c.worker(ctx)
go c.promInf.Run(ctx.Done())
go c.smonInf.Run(ctx.Done())
go c.pmonInf.Run(ctx.Done())
go c.probeInf.Run(ctx.Done())
go c.ruleInf.Run(ctx.Done())
go c.cmapInf.Run(ctx.Done())
go c.secrInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
go c.promInfs.Start(ctx.Done())
go c.smonInfs.Start(ctx.Done())
go c.pmonInfs.Start(ctx.Done())
go c.probeInfs.Start(ctx.Done())
go c.ruleInfs.Start(ctx.Done())
go c.cmapInfs.Start(ctx.Done())
go c.secrInfs.Start(ctx.Done())
go c.ssetInfs.Start(ctx.Done())
go c.nsMonInf.Run(ctx.Done())
if c.nsPromInf != c.nsMonInf {
go c.nsPromInf.Run(ctx.Done())
@ -999,7 +1017,16 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
}
ns := nsObject.(*v1.Namespace)
err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
objs, err := c.promInfs.List(labels.Everything())
if err != nil {
level.Error(c.logger).Log(
"msg", "listing all Prometheus instances from cache failed",
"err", err,
)
return
}
for _, obj := range objs {
// Check for Prometheus instances in the namespace.
p := obj.(*monitoringv1.Prometheus)
if p.Namespace == nsName {
@ -1068,12 +1095,6 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
c.enqueue(p)
return
}
})
if err != nil {
level.Error(c.logger).Log(
"msg", "listing all Prometheus instances from cache failed",
"err", err,
)
}
}
@ -1113,14 +1134,17 @@ func (c *Operator) prometheusForStatefulSet(sset interface{}) *monitoringv1.Prom
}
promKey := statefulSetKeyToPrometheusKey(key)
p, exists, err := c.promInf.GetStore().GetByKey(promKey)
p, err := c.promInfs.Get(promKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
level.Error(c.logger).Log("msg", "Prometheus lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return p.(*monitoringv1.Prometheus)
}
@ -1182,16 +1206,17 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.promInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
pobj, err := c.promInfs.Get(key)
if apierrors.IsNotFound(err) {
// Dependent resources are cleaned up by K8s via OwnerReferences
return nil
}
if err != nil {
return err
}
p := obj.(*monitoringv1.Prometheus)
p := pobj.(*monitoringv1.Prometheus)
p = p.DeepCopy()
p.APIVersion = monitoringv1.SchemeGroupVersion.String()
p.Kind = monitoringv1.PrometheusesKind
@ -1224,11 +1249,14 @@ func (c *Operator) sync(ctx context.Context, key string) error {
ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace)
// Ensure we have a StatefulSet running Prometheus deployed.
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))
if err != nil {
obj, err := c.ssetInfs.Get(prometheusKeyToStatefulSetKey(key))
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving statefulset failed")
}
exists := !apierrors.IsNotFound(err)
spec := appsv1.StatefulSetSpec{}
if obj != nil {
ss := obj.(*appsv1.StatefulSet)
@ -1642,7 +1670,7 @@ func (c *Operator) selectServiceMonitors(ctx context.Context, p *monitoringv1.Pr
level.Debug(c.logger).Log("msg", "filtering namespaces to select ServiceMonitors from", "namespaces", strings.Join(namespaces, ","), "namespace", p.Namespace, "prometheus", p.Name)
for _, ns := range namespaces {
cache.ListAllByNamespace(c.smonInf.GetIndexer(), ns, servMonSelector, func(obj interface{}) {
c.smonInfs.ListAllByNamespace(ns, servMonSelector, func(obj interface{}) {
k, ok := c.keyFunc(obj)
if ok {
serviceMonitors[k] = obj.(*monitoringv1.ServiceMonitor)
@ -1730,7 +1758,7 @@ func (c *Operator) selectPodMonitors(p *monitoringv1.Prometheus) (map[string]*mo
podMonitors := []string{}
for _, ns := range namespaces {
cache.ListAllByNamespace(c.pmonInf.GetIndexer(), ns, podMonSelector, func(obj interface{}) {
c.pmonInfs.ListAllByNamespace(ns, podMonSelector, func(obj interface{}) {
k, ok := c.keyFunc(obj)
if ok {
res[k] = obj.(*monitoringv1.PodMonitor)
@ -1773,7 +1801,7 @@ func (c *Operator) selectProbes(p *monitoringv1.Prometheus) (map[string]*monitor
probes := make([]string, 0)
for _, ns := range namespaces {
cache.ListAllByNamespace(c.probeInf.GetIndexer(), ns, bMonSelector, func(obj interface{}) {
c.probeInfs.ListAllByNamespace(ns, bMonSelector, func(obj interface{}) {
if k, ok := c.keyFunc(obj); ok {
res[k] = obj.(*monitoringv1.Probe)
probes = append(probes, k)

View file

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"github.com/ghodss/yaml"
"github.com/go-kit/kit/log/level"
@ -179,7 +178,7 @@ func (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string)
for _, ns := range namespaces {
var marshalErr error
err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {
err := c.ruleInfs.ListAllByNamespace(ns, ruleSelector, func(obj interface{}) {
promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {

View file

@ -24,10 +24,10 @@ import (
"github.com/mitchellh/hashstructure"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
prometheusoperator "github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
"github.com/go-kit/kit/log"
@ -41,9 +41,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -62,12 +60,13 @@ type Operator struct {
mclient monitoringclient.Interface
logger log.Logger
thanosRulerInf cache.SharedIndexInformer
thanosRulerInfs *informers.ForResource
cmapInfs *informers.ForResource
ruleInfs *informers.ForResource
ssetInfs *informers.ForResource
nsThanosRulerInf cache.SharedIndexInformer
nsRuleInf cache.SharedIndexInformer
cmapInf cache.SharedIndexInformer
ruleInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
@ -137,68 +136,65 @@ func New(ctx context.Context, conf prometheusoperator.Config, logger log.Logger,
},
}
o.cmapInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
},
}
}),
o.cmapInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.ThanosRulerAllowList,
o.config.Namespaces.DenyList,
o.kclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = labelThanosRulerName
},
),
&v1.ConfigMap{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
v1.SchemeGroupVersion.WithResource(string(v1.ResourceConfigMaps)),
)
if err != nil {
return nil, errors.Wrap(err, "error creating configmap informers")
}
o.thanosRulerInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).Watch(ctx, options)
},
}
}),
o.thanosRulerInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.ThanosRulerAllowList,
o.config.Namespaces.DenyList,
mclient,
resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = o.config.ThanosRulerSelector
},
),
&monitoringv1.ThanosRuler{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ThanosRulerName),
)
if err != nil {
return nil, errors.Wrap(err, "error creating thanosruler informers")
}
o.metrics.MustRegister(NewThanosRulerCollector(o.thanosRulerInf.GetStore()))
o.metrics.MustRegister(operator.NewStoreCollector("thanosruler", o.thanosRulerInf.GetStore()))
o.ruleInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
},
}
}),
o.ruleInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.AllowList,
o.config.Namespaces.DenyList,
mclient,
resyncPeriod,
nil,
),
&monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusRuleName),
)
o.metrics.MustRegister(operator.NewStoreCollector("prometheusrule", o.thanosRulerInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating prometheusrule informers")
}
o.ssetInf = cache.NewSharedIndexInformer(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(o.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
}),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
o.ssetInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.ThanosRulerAllowList,
o.config.Namespaces.DenyList,
o.kclient,
resyncPeriod,
nil,
),
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
)
if err != nil {
return nil, errors.Wrap(err, "error creating statefulset informers")
}
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
// nsResyncPeriod is used to control how often the namespace informer
@ -234,16 +230,32 @@ func New(ctx context.Context, conf prometheusoperator.Config, logger log.Logger,
// waitForCacheSync waits for the informers' caches to be synced.
func (o *Operator) waitForCacheSync(stopc <-chan struct{}) error {
ok := true
for _, infs := range []struct {
name string
informersForResource *informers.ForResource
}{
{"ThanosRuler", o.thanosRulerInfs},
{"ConfigMap", o.cmapInfs},
{"PrometheusRule", o.ruleInfs},
{"StatefulSet", o.ssetInfs},
} {
for _, inf := range infs.informersForResource.GetInformers() {
if !cache.WaitForCacheSync(stopc, inf.Informer().HasSynced) {
level.Error(o.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
ok = false
} else {
level.Debug(o.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
}
}
}
informers := []struct {
name string
informer cache.SharedIndexInformer
}{
{"ThanosRuler", o.thanosRulerInf},
{"ThanosRulerNamespace", o.nsThanosRulerInf},
{"RuleNamespace", o.nsRuleInf},
{"ConfigMap", o.cmapInf},
{"PrometheusRule", o.ruleInf},
{"StatefulSet", o.ssetInf},
}
for _, inf := range informers {
if !cache.WaitForCacheSync(stopc, inf.informer.HasSynced) {
@ -262,22 +274,22 @@ func (o *Operator) waitForCacheSync(stopc <-chan struct{}) error {
// addHandlers adds the eventhandlers to the informers.
func (o *Operator) addHandlers() {
o.thanosRulerInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.thanosRulerInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleThanosRulerAdd,
DeleteFunc: o.handleThanosRulerDelete,
UpdateFunc: o.handleThanosRulerUpdate,
})
o.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.cmapInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleConfigMapAdd,
DeleteFunc: o.handleConfigMapDelete,
UpdateFunc: o.handleConfigMapUpdate,
})
o.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.ruleInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleRuleAdd,
DeleteFunc: o.handleRuleDelete,
UpdateFunc: o.handleRuleUpdate,
})
o.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleStatefulSetAdd,
DeleteFunc: o.handleStatefulSetDelete,
UpdateFunc: o.handleStatefulSetUpdate,
@ -311,14 +323,14 @@ func (o *Operator) Run(ctx context.Context) error {
go o.worker(ctx)
go o.thanosRulerInf.Run(ctx.Done())
go o.cmapInf.Run(ctx.Done())
go o.ruleInf.Run(ctx.Done())
go o.thanosRulerInfs.Start(ctx.Done())
go o.cmapInfs.Start(ctx.Done())
go o.ruleInfs.Start(ctx.Done())
go o.nsRuleInf.Run(ctx.Done())
if o.nsRuleInf != o.nsThanosRulerInf {
go o.nsThanosRulerInf.Run(ctx.Done())
}
go o.ssetInf.Run(ctx.Done())
go o.ssetInfs.Start(ctx.Done())
if err := o.waitForCacheSync(ctx.Done()); err != nil {
return err
}
@ -453,14 +465,16 @@ func (o *Operator) thanosForStatefulSet(sset interface{}) *monitoringv1.ThanosRu
}
thanosKey := statefulSetKeyToThanosKey(key)
tr, exists, err := o.thanosRulerInf.GetStore().GetByKey(thanosKey)
tr, err := o.thanosRulerInfs.Get(thanosKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
level.Error(o.logger).Log("msg", "ThanosRuler lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return tr.(*monitoringv1.ThanosRuler)
}
@ -569,16 +583,16 @@ func (o *Operator) processNextWorkItem(ctx context.Context) bool {
}
func (o *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := o.thanosRulerInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
trobj, err := o.thanosRulerInfs.Get(key)
if apierrors.IsNotFound(err) {
// Dependent resources are cleaned up by K8s via OwnerReferences
return nil
}
if err != nil {
return err
}
tr := obj.(*monitoringv1.ThanosRuler)
tr := trobj.(*monitoringv1.ThanosRuler)
tr = tr.DeepCopy()
tr.APIVersion = monitoringv1.SchemeGroupVersion.String()
tr.Kind = monitoringv1.ThanosRulerKind
@ -602,11 +616,14 @@ func (o *Operator) sync(ctx context.Context, key string) error {
// Ensure we have a StatefulSet running Thanos deployed.
ssetClient := o.kclient.AppsV1().StatefulSets(tr.Namespace)
obj, exists, err = o.ssetInf.GetIndexer().GetByKey(thanosKeyToStatefulSetKey(key))
if err != nil {
obj, err := o.ssetInfs.Get(thanosKeyToStatefulSetKey(key))
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving statefulset failed")
}
exists := !apierrors.IsNotFound(err)
if !exists {
sset, err := makeStatefulSet(tr, o.config, ruleConfigMapNames, "")
if err != nil {
@ -787,7 +804,9 @@ func (o *Operator) enqueueForNamespace(store cache.Store, nsName string) {
}
ns := nsObject.(*v1.Namespace)
err = cache.ListAll(o.thanosRulerInf.GetStore(), labels.Everything(), func(obj interface{}) {
objs, err := o.thanosRulerInfs.List(labels.Everything())
for _, obj := range objs {
// Check for ThanosRuler instances in the namespace.
tr := obj.(*monitoringv1.ThanosRuler)
if tr.Namespace == nsName {
@ -810,7 +829,7 @@ func (o *Operator) enqueueForNamespace(store cache.Store, nsName string) {
o.enqueue(tr)
return
}
})
}
if err != nil {
level.Error(o.logger).Log(
"msg", "listing all ThanosRuler instances from cache failed",

View file

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"github.com/ghodss/yaml"
"github.com/go-kit/kit/log/level"
@ -179,7 +178,7 @@ func (o *Operator) selectRules(t *monitoringv1.ThanosRuler, namespaces []string)
for _, ns := range namespaces {
var marshalErr error
err := cache.ListAllByNamespace(o.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {
err := o.ruleInfs.ListAllByNamespace(ns, ruleSelector, func(obj interface{}) {
promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {

View file

@ -31,7 +31,7 @@ func testAlertmanagerInstanceNamespaces_AllNs(t *testing.T) {
nonInstanceNs := ctx.CreateNamespace(t, framework.KubeClient)
ctx.SetupPrometheusRBACGlobal(t, instanceNs, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, nil, nil, []string{instanceNs}, false)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, nil, nil, []string{instanceNs}, false, true)
if err != nil {
t.Fatal(err)
}
@ -72,7 +72,7 @@ func testAlertmanagerInstanceNamespaces_DenyNs(t *testing.T) {
instanceNs := ctx.CreateNamespace(t, framework.KubeClient)
ctx.SetupPrometheusRBACGlobal(t, instanceNs, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, []string{instanceNs}, nil, []string{instanceNs}, false)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, []string{instanceNs}, nil, []string{instanceNs}, false, true)
if err != nil {
t.Fatal(err)
}

View file

@ -20,8 +20,8 @@ import (
"github.com/gogo/protobuf/proto"
testFramework "github.com/prometheus-operator/prometheus-operator/test/framework"
"github.com/pkg/errors"
testFramework "github.com/prometheus-operator/prometheus-operator/test/framework"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
api_errors "k8s.io/apimachinery/pkg/api/errors"
@ -38,7 +38,7 @@ func testDenyPrometheus(t *testing.T) {
ctx.SetupPrometheusRBAC(t, operatorNamespace, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNamespace, *opImage, nil, deniedNamespaces, nil, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNamespace, *opImage, nil, deniedNamespaces, nil, nil, false, true)
if err != nil {
t.Fatal(err)
}
@ -69,6 +69,13 @@ func testDenyPrometheus(t *testing.T) {
t.Fatalf("expected not to find a Prometheus statefulset, but did: %v/%v", sts.Namespace, sts.Name)
}
}
for _, allowed := range allowedNamespaces {
err := framework.DeletePrometheusAndWaitUntilGone(allowed, "allowed")
if err != nil {
t.Fatal(err)
}
}
}
func testDenyServiceMonitor(t *testing.T) {
@ -81,7 +88,7 @@ func testDenyServiceMonitor(t *testing.T) {
ctx.SetupPrometheusRBAC(t, operatorNamespace, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNamespace, *opImage, nil, deniedNamespaces, nil, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNamespace, *opImage, nil, deniedNamespaces, nil, nil, false, true)
if err != nil {
t.Fatal(err)
}
@ -176,4 +183,62 @@ func testDenyServiceMonitor(t *testing.T) {
t.Fatalf("expected to have 1 target, got %d", got)
}
}
for _, allowed := range allowedNamespaces {
if err := framework.MonClientV1.ServiceMonitors(allowed).Delete(context.TODO(), "allowed", metav1.DeleteOptions{}); err != nil {
t.Fatal("Deleting ServiceMonitor failed: ", err)
}
if err := framework.WaitForActiveTargets(allowed, "prometheus-allowed", 0); err != nil {
t.Fatal(err)
}
}
}
func testDenyThanosRuler(t *testing.T) {
ctx := framework.NewTestCtx(t)
defer ctx.Cleanup(t)
operatorNamespace := ctx.CreateNamespace(t, framework.KubeClient)
allowedNamespaces := []string{ctx.CreateNamespace(t, framework.KubeClient), ctx.CreateNamespace(t, framework.KubeClient)}
deniedNamespaces := []string{ctx.CreateNamespace(t, framework.KubeClient), ctx.CreateNamespace(t, framework.KubeClient)}
ctx.SetupPrometheusRBAC(t, operatorNamespace, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNamespace, *opImage, nil, deniedNamespaces, nil, nil, false, true)
if err != nil {
t.Fatal(err)
}
for _, denied := range deniedNamespaces {
tr := framework.MakeBasicThanosRuler("denied", 1)
_, err = framework.MonClientV1.ThanosRulers(denied).Create(context.TODO(), tr, metav1.CreateOptions{})
if err != nil {
t.Fatalf("creating %v Prometheus instances failed (%v): %v", tr.Spec.Replicas, tr.Name, err)
}
}
for _, allowed := range allowedNamespaces {
ctx.SetupPrometheusRBAC(t, allowed, framework.KubeClient)
if _, err := framework.CreateThanosRulerAndWaitUntilReady(allowed, framework.MakeBasicThanosRuler("allowed", 1)); err != nil {
t.Fatal(err)
}
}
for _, denied := range deniedNamespaces {
// this is not ideal, as we cannot really find out if prometheus operator did not reconcile the denied thanos ruler.
// nevertheless it is very likely that it reconciled it as the allowed prometheus is up.
sts, err := framework.KubeClient.AppsV1().StatefulSets(denied).Get(context.TODO(), "thanosruler-denied", metav1.GetOptions{})
if !api_errors.IsNotFound(err) {
t.Fatalf("expected not to find a Prometheus statefulset, but did: %v/%v", sts.Namespace, sts.Name)
}
}
for _, allowed := range allowedNamespaces {
err := framework.DeleteThanosRulerAndWaitUntilGone(allowed, "allowed")
if err != nil {
t.Fatal(err)
}
}
}

View file

@ -86,7 +86,7 @@ func TestAllNS(t *testing.T) {
ns := ctx.CreateNamespace(t, framework.KubeClient)
finalizers, err := framework.CreatePrometheusOperator(ns, *opImage, nil, nil, nil, nil, true)
finalizers, err := framework.CreatePrometheusOperator(ns, *opImage, nil, nil, nil, nil, true, true)
if err != nil {
t.Fatal(err)
}
@ -223,6 +223,7 @@ func TestDenylist(t *testing.T) {
testFuncs := map[string]func(t *testing.T){
"Prometheus": testDenyPrometheus,
"ServiceMonitor": testDenyServiceMonitor,
"ThanosRuler": testDenyThanosRuler,
}
for name, f := range testFuncs {

View file

@ -18,8 +18,8 @@ import (
"context"
"testing"
testFramework "github.com/prometheus-operator/prometheus-operator/test/framework"
"github.com/pkg/errors"
testFramework "github.com/prometheus-operator/prometheus-operator/test/framework"
v1 "k8s.io/api/core/v1"
api_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,7 +34,7 @@ func testPrometheusInstanceNamespaces_AllNs(t *testing.T) {
nonInstanceNs := ctx.CreateNamespace(t, framework.KubeClient)
ctx.SetupPrometheusRBACGlobal(t, instanceNs, framework.KubeClient)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, nil, []string{instanceNs}, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, nil, []string{instanceNs}, nil, false, true)
if err != nil {
t.Fatal(err)
}
@ -93,7 +93,7 @@ func testPrometheusInstanceNamespaces_DenyList(t *testing.T) {
}
}
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, []string{deniedNs, instanceNs}, []string{instanceNs}, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, nil, []string{deniedNs, instanceNs}, []string{instanceNs}, nil, false, true)
if err != nil {
t.Fatal(err)
}
@ -216,7 +216,7 @@ func testPrometheusInstanceNamespaces_AllowList(t *testing.T) {
}
}
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, []string{allowedNs}, nil, []string{instanceNs}, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNs, *opImage, []string{allowedNs}, nil, []string{instanceNs}, nil, false, false)
if err != nil {
t.Fatal(err)
}

View file

@ -2337,7 +2337,7 @@ func testOperatorNSScope(t *testing.T) {
}
// Prometheus Operator only watches single namespace mainNS, not arbitraryNS.
_, err := framework.CreatePrometheusOperator(operatorNS, *opImage, []string{mainNS}, nil, nil, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNS, *opImage, []string{mainNS}, nil, nil, nil, false, true)
if err != nil {
t.Fatal(err)
}
@ -2407,7 +2407,7 @@ func testOperatorNSScope(t *testing.T) {
}
// Prometheus Operator only watches prometheusNS and ruleNS, not arbitraryNS.
_, err := framework.CreatePrometheusOperator(operatorNS, *opImage, []string{prometheusNS, ruleNS}, nil, nil, nil, false)
_, err := framework.CreatePrometheusOperator(operatorNS, *opImage, []string{prometheusNS, ruleNS}, nil, nil, nil, false, true)
if err != nil {
t.Fatal(err)
}

View file

@ -35,10 +35,10 @@ import (
"k8s.io/client-go/tools/clientcmd"
certutil "k8s.io/client-go/util/cert"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
)
const (
@ -163,7 +163,7 @@ func (f *Framework) MakeEchoDeployment(group string) *appsv1.Deployment {
// Returns the CA, which can bs used to access the operator over TLS
func (f *Framework) CreatePrometheusOperator(ns, opImage string, namespaceAllowlist,
namespaceDenylist, prometheusInstanceNamespaces, alertmanagerInstanceNamespaces []string,
createRuleAdmissionHooks bool) ([]finalizerFn, error) {
createRuleAdmissionHooks, createClusterRoleBindings bool) ([]finalizerFn, error) {
var finalizers []finalizerFn
@ -187,8 +187,20 @@ func (f *Framework) CreatePrometheusOperator(ns, opImage string, namespaceAllowl
return nil, errors.Wrap(err, "failed to update prometheus cluster role")
}
if _, err := CreateClusterRoleBinding(f.KubeClient, ns, "../../example/rbac/prometheus-operator/prometheus-operator-cluster-role-binding.yaml"); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, errors.Wrap(err, "failed to create prometheus cluster role binding")
if createClusterRoleBindings {
if _, err := CreateClusterRoleBinding(f.KubeClient, ns, "../../example/rbac/prometheus-operator/prometheus-operator-cluster-role-binding.yaml"); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, errors.Wrap(err, "failed to create prometheus cluster role binding")
}
} else {
namespaces := namespaceAllowlist
namespaces = append(namespaces, prometheusInstanceNamespaces...)
namespaces = append(namespaces, alertmanagerInstanceNamespaces...)
for _, n := range namespaces {
if _, err := CreateRoleBindingForSubjectNamespace(f.KubeClient, n, ns, "../framework/ressources/prometheus-operator-role-binding.yaml"); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, errors.Wrap(err, "failed to create prometheus operator role binding")
}
}
}
certBytes, keyBytes, err := certutil.GenerateSelfSignedCertKey(fmt.Sprintf("%s.%s.svc", prometheusOperatorServiceDeploymentName, ns), nil, nil)

View file

@ -0,0 +1,11 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: prometheus-operator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prometheus-operator
subjects:
- kind: ServiceAccount
name: prometheus-operator

View file

@ -16,6 +16,7 @@ package framework
import (
"context"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
@ -33,6 +34,22 @@ func CreateRoleBinding(kubeClient kubernetes.Interface, ns string, relativePath
return finalizerFn, err
}
func CreateRoleBindingForSubjectNamespace(kubeClient kubernetes.Interface, ns, subjectNs string, relativePath string) (finalizerFn, error) {
finalizerFn := func() error { return DeleteRoleBinding(kubeClient, ns, relativePath) }
roleBinding, err := parseRoleBindingYaml(relativePath)
for i := range roleBinding.Subjects {
roleBinding.Subjects[i].Namespace = subjectNs
}
if err != nil {
return finalizerFn, err
}
_, err = kubeClient.RbacV1().RoleBindings(ns).Create(context.TODO(), roleBinding, metav1.CreateOptions{})
return finalizerFn, err
}
func DeleteRoleBinding(kubeClient kubernetes.Interface, ns string, relativePath string) error {
roleBinding, err := parseRoleBindingYaml(relativePath)
if err != nil {