1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

pkg/thanos: remove multilistwatcher

This commit is contained in:
Sergiusz Urbaniak 2020-08-25 17:46:45 +02:00
parent 920f2490d9
commit 0c9283465a
2 changed files with 99 additions and 86 deletions

View file

@ -24,10 +24,10 @@ import (
"github.com/mitchellh/hashstructure"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
prometheusoperator "github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
"github.com/go-kit/kit/log"
@ -41,9 +41,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -62,12 +60,13 @@ type Operator struct {
mclient monitoringclient.Interface
logger log.Logger
thanosRulerInf cache.SharedIndexInformer
thanosRulerInfs *informers.InformersForResource
cmapInfs *informers.InformersForResource
ruleInfs *informers.InformersForResource
ssetInfs *informers.InformersForResource
nsThanosRulerInf cache.SharedIndexInformer
nsRuleInf cache.SharedIndexInformer
cmapInf cache.SharedIndexInformer
ruleInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
@ -137,69 +136,60 @@ func New(ctx context.Context, conf prometheusoperator.Config, logger log.Logger,
},
}
o.cmapInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
},
}
}),
o.cmapInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList,
o.kclient, resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = labelThanosRulerName
},
),
&v1.ConfigMap{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
v1.SchemeGroupVersion.WithResource(string(v1.ResourceConfigMaps)),
)
o.thanosRulerInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).Watch(ctx, options)
},
}
}),
if err != nil {
return nil, errors.Wrap(err, "error creating configmap informers")
}
o.thanosRulerInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList,
mclient, resyncPeriod,
func(options *metav1.ListOptions) {
options.LabelSelector = o.config.ThanosRulerSelector
},
),
&monitoringv1.ThanosRuler{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ThanosRulerName),
)
o.metrics.MustRegister(NewThanosRulerCollector(o.thanosRulerInf.GetStore()))
o.metrics.MustRegister(operator.NewStoreCollector("thanosruler", o.thanosRulerInf.GetStore()))
if err != nil {
return nil, errors.Wrap(err, "error creating thanosruler informers")
}
o.ruleInf = cache.NewSharedIndexInformer(
o.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
},
}
}),
o.ruleInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
o.config.Namespaces.AllowList, o.config.Namespaces.DenyList,
mclient, resyncPeriod, nil,
),
&monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusRuleName),
)
o.metrics.MustRegister(operator.NewStoreCollector("prometheusrule", o.thanosRulerInf.GetStore()))
o.ssetInf = cache.NewSharedIndexInformer(
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return cache.NewListWatchFromClient(o.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
}),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
if err != nil {
return nil, errors.Wrap(err, "error creating prometheusrule informers")
}
o.ssetInfs, err = informers.NewInformersForResource(
informers.NewKubeInformerFactories(
o.config.Namespaces.ThanosRulerAllowList, o.config.Namespaces.DenyList,
o.kclient, resyncPeriod, nil,
),
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
)
if err != nil {
return nil, errors.Wrap(err, "error creating statefulset informers")
}
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
// nsResyncPeriod is used to control how often the namespace informer
// should resync. If the unprivileged ListerWatcher is used, then the
@ -234,16 +224,32 @@ func New(ctx context.Context, conf prometheusoperator.Config, logger log.Logger,
// waitForCacheSync waits for the informers' caches to be synced.
func (o *Operator) waitForCacheSync(stopc <-chan struct{}) error {
ok := true
for _, infs := range []struct {
name string
informersForResource *informers.InformersForResource
}{
{"ThanosRuler", o.thanosRulerInfs},
{"ConfigMap", o.cmapInfs},
{"PrometheusRule", o.ruleInfs},
{"StatefulSet", o.ssetInfs},
} {
for _, inf := range infs.informersForResource.GetInformers() {
if !cache.WaitForCacheSync(stopc, inf.Informer().HasSynced) {
level.Error(o.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
ok = false
} else {
level.Debug(o.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
}
}
}
informers := []struct {
name string
informer cache.SharedIndexInformer
}{
{"ThanosRuler", o.thanosRulerInf},
{"ThanosRulerNamespace", o.nsThanosRulerInf},
{"RuleNamespace", o.nsRuleInf},
{"ConfigMap", o.cmapInf},
{"PrometheusRule", o.ruleInf},
{"StatefulSet", o.ssetInf},
}
for _, inf := range informers {
if !cache.WaitForCacheSync(stopc, inf.informer.HasSynced) {
@ -262,22 +268,22 @@ func (o *Operator) waitForCacheSync(stopc <-chan struct{}) error {
// addHandlers adds the eventhandlers to the informers.
func (o *Operator) addHandlers() {
o.thanosRulerInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.thanosRulerInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleThanosRulerAdd,
DeleteFunc: o.handleThanosRulerDelete,
UpdateFunc: o.handleThanosRulerUpdate,
})
o.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.cmapInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleConfigMapAdd,
DeleteFunc: o.handleConfigMapDelete,
UpdateFunc: o.handleConfigMapUpdate,
})
o.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.ruleInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleRuleAdd,
DeleteFunc: o.handleRuleDelete,
UpdateFunc: o.handleRuleUpdate,
})
o.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
o.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleStatefulSetAdd,
DeleteFunc: o.handleStatefulSetDelete,
UpdateFunc: o.handleStatefulSetUpdate,
@ -311,14 +317,14 @@ func (o *Operator) Run(ctx context.Context) error {
go o.worker(ctx)
go o.thanosRulerInf.Run(ctx.Done())
go o.cmapInf.Run(ctx.Done())
go o.ruleInf.Run(ctx.Done())
go o.thanosRulerInfs.Start(ctx.Done())
go o.cmapInfs.Start(ctx.Done())
go o.ruleInfs.Start(ctx.Done())
go o.nsRuleInf.Run(ctx.Done())
if o.nsRuleInf != o.nsThanosRulerInf {
go o.nsThanosRulerInf.Run(ctx.Done())
}
go o.ssetInf.Run(ctx.Done())
go o.ssetInfs.Start(ctx.Done())
if err := o.waitForCacheSync(ctx.Done()); err != nil {
return err
}
@ -453,14 +459,16 @@ func (o *Operator) thanosForStatefulSet(sset interface{}) *monitoringv1.ThanosRu
}
thanosKey := statefulSetKeyToThanosKey(key)
tr, exists, err := o.thanosRulerInf.GetStore().GetByKey(thanosKey)
tr, err := o.thanosRulerInfs.Get(thanosKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
level.Error(o.logger).Log("msg", "ThanosRuler lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return tr.(*monitoringv1.ThanosRuler)
}
@ -569,16 +577,17 @@ func (o *Operator) processNextWorkItem(ctx context.Context) bool {
}
func (o *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := o.thanosRulerInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
trobj, err := o.thanosRulerInfs.Get(key)
if apierrors.IsNotFound(err) {
// Dependent resources are cleaned up by K8s via OwnerReferences
return nil
}
if err != nil {
return err
}
tr := obj.(*monitoringv1.ThanosRuler)
tr := trobj.(*monitoringv1.ThanosRuler)
tr = tr.DeepCopy()
tr.APIVersion = monitoringv1.SchemeGroupVersion.String()
tr.Kind = monitoringv1.ThanosRulerKind
@ -602,11 +611,14 @@ func (o *Operator) sync(ctx context.Context, key string) error {
// Ensure we have a StatefulSet running Thanos deployed.
ssetClient := o.kclient.AppsV1().StatefulSets(tr.Namespace)
obj, exists, err = o.ssetInf.GetIndexer().GetByKey(thanosKeyToStatefulSetKey(key))
if err != nil {
obj, err := o.ssetInfs.Get(thanosKeyToStatefulSetKey(key))
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving statefulset failed")
}
exists := !apierrors.IsNotFound(err)
if !exists {
sset, err := makeStatefulSet(tr, o.config, ruleConfigMapNames, "")
if err != nil {
@ -787,7 +799,9 @@ func (o *Operator) enqueueForNamespace(store cache.Store, nsName string) {
}
ns := nsObject.(*v1.Namespace)
err = cache.ListAll(o.thanosRulerInf.GetStore(), labels.Everything(), func(obj interface{}) {
objs, err := o.thanosRulerInfs.List(labels.Everything())
for _, obj := range objs {
// Check for ThanosRuler instances in the namespace.
tr := obj.(*monitoringv1.ThanosRuler)
if tr.Namespace == nsName {
@ -810,7 +824,7 @@ func (o *Operator) enqueueForNamespace(store cache.Store, nsName string) {
o.enqueue(tr)
return
}
})
}
if err != nil {
level.Error(o.logger).Log(
"msg", "listing all ThanosRuler instances from cache failed",

View file

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"github.com/ghodss/yaml"
"github.com/go-kit/kit/log/level"
@ -179,7 +178,7 @@ func (o *Operator) selectRules(t *monitoringv1.ThanosRuler, namespaces []string)
for _, ns := range namespaces {
var marshalErr error
err := cache.ListAllByNamespace(o.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {
err := o.ruleInfs.ListAllByNamespace(ns, ruleSelector, func(obj interface{}) {
promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {