1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

pkg/alertmanager: remove multlistwatcher

This commit is contained in:
Sergiusz Urbaniak 2020-08-25 16:11:07 +02:00
parent e9ad330bf8
commit 920f2490d9

View file

@ -23,8 +23,8 @@ import (
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
prometheusoperator "github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
@ -38,9 +38,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -57,8 +55,8 @@ type Operator struct {
mclient monitoringclient.Interface
logger log.Logger
alrtInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
alrtInfs *informers.InformersForResource
ssetInfs *informers.InformersForResource
queue workqueue.RateLimitingInterface
@ -117,56 +115,57 @@ func New(ctx context.Context, c prometheusoperator.Config, logger log.Logger, r
},
}
o.alrtInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(ctx, options)
},
}
}),
o.alrtInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList,
mclient, resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = o.config.AlertManagerSelector
},
),
&monitoringv1.Alertmanager{}, resyncPeriod, cache.Indexers{},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.AlertmanagerName),
)
o.metrics.MustRegister(NewAlertmanagerCollector(o.alrtInf.GetStore()))
o.metrics.MustRegister(operator.NewStoreCollector("alertmanager", o.alrtInf.GetStore()))
o.ssetInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(o.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
}),
if err != nil {
return nil, errors.Wrap(err, "error creating alertmanager informers")
}
o.ssetInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.AlertmanagerAllowList, o.config.Namespaces.DenyList,
o.kclient, resyncPeriod, nil,
),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{},
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
)
if err != nil {
return nil, errors.Wrap(err, "error creating statefulset informers")
}
return o, nil
}
// waitForCacheSync waits for the informers' caches to be synced.
func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
ok := true
informers := []struct {
name string
informer cache.SharedIndexInformer
for _, infs := range []struct {
name string
informersForResource *informers.InformersForResource
}{
{"Alertmanager", c.alrtInf},
{"StatefulSet", c.ssetInf},
}
for _, inf := range informers {
if !cache.WaitForCacheSync(stopc, inf.informer.HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", inf.name))
ok = false
} else {
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", inf.name))
{"Alertmanager", c.alrtInfs},
{"StatefulSet", c.ssetInfs},
} {
for _, inf := range infs.informersForResource.GetInformers() {
if !cache.WaitForCacheSync(stopc, inf.Informer().HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
ok = false
} else {
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
}
}
}
if !ok {
return errors.New("failed to sync caches")
}
@ -176,12 +175,12 @@ func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
c.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.alrtInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAlertmanagerAdd,
DeleteFunc: c.handleAlertmanagerDelete,
UpdateFunc: c.handleAlertmanagerUpdate,
})
c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleStatefulSetAdd,
DeleteFunc: c.handleStatefulSetDelete,
UpdateFunc: c.handleStatefulSetUpdate,
@ -215,8 +214,8 @@ func (c *Operator) Run(ctx context.Context) error {
go c.worker(ctx)
go c.alrtInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
go c.alrtInfs.Start(ctx.Done())
go c.ssetInfs.Start(ctx.Done())
if err := c.waitForCacheSync(ctx.Done()); err != nil {
return err
}
@ -303,14 +302,16 @@ func (c *Operator) alertmanagerForStatefulSet(sset interface{}) *monitoringv1.Al
}
aKey := statefulSetKeyToAlertmanagerKey(key)
a, exists, err := c.alrtInf.GetStore().GetByKey(aKey)
a, err := c.alrtInfs.Get(aKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
level.Error(c.logger).Log("msg", "Alertmanager lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return a.(*monitoringv1.Alertmanager)
}
@ -398,16 +399,17 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.alrtInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
aobj, err := c.alrtInfs.Get(key)
if apierrors.IsNotFound(err) {
// Dependent resources are cleaned up by K8s via OwnerReferences
return nil
}
if err != nil {
return err
}
am := obj.(*monitoringv1.Alertmanager)
am := aobj.(*monitoringv1.Alertmanager)
am = am.DeepCopy()
am.APIVersion = monitoringv1.SchemeGroupVersion.String()
am.Kind = monitoringv1.AlertmanagersKind
@ -426,11 +428,14 @@ func (c *Operator) sync(ctx context.Context, key string) error {
ssetClient := c.kclient.AppsV1().StatefulSets(am.Namespace)
// Ensure we have a StatefulSet running Alertmanager deployed.
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(alertmanagerKeyToStatefulSetKey(key))
if err != nil {
obj, err := c.ssetInfs.Get(alertmanagerKeyToStatefulSetKey(key))
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving statefulset failed")
}
exists := !apierrors.IsNotFound(err)
if !exists {
sset, err := makeStatefulSet(am, nil, c.config)
if err != nil {