mirror of
https://github.com/prometheus-operator/prometheus-operator.git
synced 2025-04-21 19:49:46 +00:00
pkg/prometheus: remove multilistwatcher
This commit is contained in:
parent
f22fd2c7c0
commit
e9ad330bf8
3 changed files with 188 additions and 174 deletions
pkg/prometheus
|
@ -15,8 +15,7 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
|
||||
v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
@ -33,11 +32,15 @@ var (
|
|||
)
|
||||
|
||||
type prometheusCollector struct {
|
||||
store cache.Store
|
||||
stores []cache.Store
|
||||
}
|
||||
|
||||
func NewPrometheusCollector(s cache.Store) *prometheusCollector {
|
||||
return &prometheusCollector{store: s}
|
||||
return &prometheusCollector{stores: []cache.Store{s}}
|
||||
}
|
||||
|
||||
func NewPrometheusCollectorForStores(s ...cache.Store) *prometheusCollector {
|
||||
return &prometheusCollector{stores: s}
|
||||
}
|
||||
|
||||
// Describe implements the prometheus.Collector interface.
|
||||
|
@ -47,8 +50,10 @@ func (c *prometheusCollector) Describe(ch chan<- *prometheus.Desc) {
|
|||
|
||||
// Collect implements the prometheus.Collector interface.
|
||||
func (c *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
for _, p := range c.store.List() {
|
||||
c.collectPrometheus(ch, p.(*v1.Prometheus))
|
||||
for _, s := range c.stores {
|
||||
for _, p := range s.List() {
|
||||
c.collectPrometheus(ch, p.(*v1.Prometheus))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/k8sutil"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
|
||||
|
@ -41,9 +42,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -61,17 +60,18 @@ type Operator struct {
|
|||
mclient monitoringclient.Interface
|
||||
logger log.Logger
|
||||
|
||||
promInf cache.SharedIndexInformer
|
||||
smonInf cache.SharedIndexInformer
|
||||
pmonInf cache.SharedIndexInformer
|
||||
probeInf cache.SharedIndexInformer
|
||||
ruleInf cache.SharedIndexInformer
|
||||
cmapInf cache.SharedIndexInformer
|
||||
secrInf cache.SharedIndexInformer
|
||||
ssetInf cache.SharedIndexInformer
|
||||
nsPromInf cache.SharedIndexInformer
|
||||
nsMonInf cache.SharedIndexInformer
|
||||
|
||||
promInfs *informers.InformersForResource
|
||||
smonInfs *informers.InformersForResource
|
||||
pmonInfs *informers.InformersForResource
|
||||
probeInfs *informers.InformersForResource
|
||||
ruleInfs *informers.InformersForResource
|
||||
cmapInfs *informers.InformersForResource
|
||||
secrInfs *informers.InformersForResource
|
||||
ssetInfs *informers.InformersForResource
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
metrics *operator.Metrics
|
||||
|
@ -248,130 +248,112 @@ func New(ctx context.Context, conf Config, logger log.Logger, r prometheus.Regis
|
|||
}
|
||||
c.metrics.MustRegister(c.nodeAddressLookupErrors, c.nodeEndpointSyncs, c.nodeEndpointSyncErrors)
|
||||
|
||||
c.promInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.LabelSelector = c.config.PromSelector
|
||||
return mclient.MonitoringV1().Prometheuses(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.LabelSelector = c.config.PromSelector
|
||||
return mclient.MonitoringV1().Prometheuses(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
c.promInfs, err = informers.NewInformersForResource(
|
||||
informers.NewMonitoringInformerFactories(
|
||||
c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList,
|
||||
mclient, resyncPeriod,
|
||||
func(options *metav1.ListOptions) {
|
||||
options.LabelSelector = c.config.PromSelector
|
||||
},
|
||||
),
|
||||
&monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.metrics.MustRegister(NewPrometheusCollector(c.promInf.GetStore()))
|
||||
c.metrics.MustRegister(operator.NewStoreCollector("prometheus", c.promInf.GetStore()))
|
||||
|
||||
c.smonInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return mclient.MonitoringV1().ServiceMonitors(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return mclient.MonitoringV1().ServiceMonitors(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
),
|
||||
&monitoringv1.ServiceMonitor{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.metrics.MustRegister(operator.NewStoreCollector("servicemonitor", c.smonInf.GetStore()))
|
||||
|
||||
c.pmonInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return mclient.MonitoringV1().PodMonitors(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return mclient.MonitoringV1().PodMonitors(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
),
|
||||
&monitoringv1.PodMonitor{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.metrics.MustRegister(operator.NewStoreCollector("podmonitor", c.pmonInf.GetStore()))
|
||||
|
||||
c.probeInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) {
|
||||
return mclient.MonitoringV1().Probes(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (w watch.Interface, err error) {
|
||||
return mclient.MonitoringV1().Probes(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
),
|
||||
&monitoringv1.Probe{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.metrics.MustRegister(operator.NewStoreCollector("probe", c.probeInf.GetStore()))
|
||||
|
||||
c.ruleInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
),
|
||||
&monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
c.metrics.MustRegister(operator.NewStoreCollector("prometheurule", c.ruleInf.GetStore()))
|
||||
|
||||
c.cmapInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.LabelSelector = labelPrometheusName
|
||||
return c.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.LabelSelector = labelPrometheusName
|
||||
return c.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
}),
|
||||
),
|
||||
&v1.ConfigMap{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusName),
|
||||
)
|
||||
|
||||
c.secrInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return cache.NewListWatchFromClient(c.kclient.CoreV1().RESTClient(), "secrets", namespace, secretListWatchSelector)
|
||||
}),
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating prometheus informers")
|
||||
}
|
||||
|
||||
var promStores []cache.Store
|
||||
for _, informer := range c.promInfs.GetInformers() {
|
||||
promStores = append(promStores, informer.Informer().GetStore())
|
||||
}
|
||||
c.metrics.MustRegister(NewPrometheusCollectorForStores(promStores...))
|
||||
|
||||
c.smonInfs, err = informers.NewInformersForResource(
|
||||
informers.NewMonitoringInformerFactories(
|
||||
c.config.Namespaces.AllowList, c.config.Namespaces.DenyList,
|
||||
mclient, resyncPeriod, nil,
|
||||
),
|
||||
&v1.Secret{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName),
|
||||
)
|
||||
|
||||
c.ssetInf = cache.NewSharedIndexInformer(
|
||||
c.metrics.NewInstrumentedListerWatcher(
|
||||
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
|
||||
return cache.NewListWatchFromClient(c.kclient.AppsV1().RESTClient(), "statefulsets", namespace, fields.Everything())
|
||||
}),
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating servicemonitor informers")
|
||||
}
|
||||
|
||||
c.pmonInfs, err = informers.NewInformersForResource(
|
||||
informers.NewMonitoringInformerFactories(
|
||||
c.config.Namespaces.AllowList, c.config.Namespaces.DenyList,
|
||||
mclient, resyncPeriod, nil,
|
||||
),
|
||||
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName),
|
||||
)
|
||||
|
||||
c.probeInfs, err = informers.NewInformersForResource(
|
||||
informers.NewMonitoringInformerFactories(
|
||||
c.config.Namespaces.AllowList, c.config.Namespaces.DenyList,
|
||||
mclient, resyncPeriod, nil,
|
||||
),
|
||||
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating podmonitor informers")
|
||||
}
|
||||
|
||||
c.ruleInfs, err = informers.NewInformersForResource(
|
||||
informers.NewMonitoringInformerFactories(
|
||||
c.config.Namespaces.AllowList, c.config.Namespaces.DenyList,
|
||||
mclient, resyncPeriod, nil,
|
||||
),
|
||||
monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusRuleName),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating podmonitor informers")
|
||||
}
|
||||
|
||||
c.cmapInfs, err = informers.NewInformersForResource(
|
||||
informers.NewKubeInformerFactories(
|
||||
c.config.Namespaces.AllowList, c.config.Namespaces.DenyList,
|
||||
c.kclient, resyncPeriod,
|
||||
func(options *metav1.ListOptions) {
|
||||
options.LabelSelector = labelPrometheusName
|
||||
},
|
||||
),
|
||||
v1.SchemeGroupVersion.WithResource(string(v1.ResourceConfigMaps)),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating configmap informers")
|
||||
}
|
||||
|
||||
c.secrInfs, err = informers.NewInformersForResource(
|
||||
informers.NewKubeInformerFactories(
|
||||
c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList,
|
||||
c.kclient, resyncPeriod, func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = strings.Join([]string{options.FieldSelector, secretListWatchSelector.String()}, ",")
|
||||
},
|
||||
),
|
||||
v1.SchemeGroupVersion.WithResource(string(v1.ResourceSecrets)),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating configmap informers")
|
||||
}
|
||||
|
||||
c.ssetInfs, err = informers.NewInformersForResource(
|
||||
informers.NewKubeInformerFactories(
|
||||
c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList,
|
||||
c.kclient, resyncPeriod, nil,
|
||||
),
|
||||
appsv1.SchemeGroupVersion.WithResource("statefulsets"),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating configmap informers")
|
||||
}
|
||||
|
||||
newNamespaceInformer := func(o *Operator, allowList map[string]struct{}) cache.SharedIndexInformer {
|
||||
// nsResyncPeriod is used to control how often the namespace informer
|
||||
// should resync. If the unprivileged ListerWatcher is used, then the
|
||||
|
@ -406,18 +388,34 @@ func New(ctx context.Context, conf Config, logger log.Logger, r prometheus.Regis
|
|||
// waitForCacheSync waits for the informers' caches to be synced.
|
||||
func (c *Operator) waitForCacheSync(ctx context.Context) error {
|
||||
ok := true
|
||||
|
||||
for _, infs := range []struct {
|
||||
name string
|
||||
informersForResource *informers.InformersForResource
|
||||
}{
|
||||
{"Prometheus", c.promInfs},
|
||||
{"ServiceMonitor", c.smonInfs},
|
||||
{"PodMonitor", c.pmonInfs},
|
||||
{"PrometheusRule", c.ruleInfs},
|
||||
{"Probe", c.probeInfs},
|
||||
{"ConfigMap", c.cmapInfs},
|
||||
{"Secret", c.secrInfs},
|
||||
{"StatefulSet", c.ssetInfs},
|
||||
} {
|
||||
for _, inf := range infs.informersForResource.GetInformers() {
|
||||
if !cache.WaitForCacheSync(ctx.Done(), inf.Informer().HasSynced) {
|
||||
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", infs.name))
|
||||
ok = false
|
||||
} else {
|
||||
level.Debug(c.logger).Log("msg", fmt.Sprintf("successfully synced %s cache", infs.name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
informers := []struct {
|
||||
name string
|
||||
informer cache.SharedIndexInformer
|
||||
}{
|
||||
{"Prometheus", c.promInf},
|
||||
{"ServiceMonitor", c.smonInf},
|
||||
{"PodMonitor", c.pmonInf},
|
||||
{"Probe", c.probeInf},
|
||||
{"PrometheusRule", c.ruleInf},
|
||||
{"ConfigMap", c.cmapInf},
|
||||
{"Secret", c.secrInf},
|
||||
{"StatefulSet", c.ssetInf},
|
||||
{"PromNamespace", c.nsPromInf},
|
||||
{"MonNamespace", c.nsMonInf},
|
||||
}
|
||||
|
@ -438,42 +436,44 @@ func (c *Operator) waitForCacheSync(ctx context.Context) error {
|
|||
|
||||
// addHandlers adds the eventhandlers to the informers.
|
||||
func (c *Operator) addHandlers() {
|
||||
c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.promInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handlePrometheusAdd,
|
||||
DeleteFunc: c.handlePrometheusDelete,
|
||||
UpdateFunc: c.handlePrometheusUpdate,
|
||||
})
|
||||
c.smonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
||||
c.smonInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleSmonAdd,
|
||||
DeleteFunc: c.handleSmonDelete,
|
||||
UpdateFunc: c.handleSmonUpdate,
|
||||
})
|
||||
c.pmonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
||||
c.pmonInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handlePmonAdd,
|
||||
DeleteFunc: c.handlePmonDelete,
|
||||
UpdateFunc: c.handlePmonUpdate,
|
||||
})
|
||||
c.probeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.probeInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleBmonAdd,
|
||||
UpdateFunc: c.handleBmonUpdate,
|
||||
DeleteFunc: c.handleBmonDelete,
|
||||
})
|
||||
c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.ruleInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleRuleAdd,
|
||||
DeleteFunc: c.handleRuleDelete,
|
||||
UpdateFunc: c.handleRuleUpdate,
|
||||
})
|
||||
c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.cmapInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleConfigMapAdd,
|
||||
DeleteFunc: c.handleConfigMapDelete,
|
||||
UpdateFunc: c.handleConfigMapUpdate,
|
||||
})
|
||||
c.secrInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.secrInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleSecretAdd,
|
||||
DeleteFunc: c.handleSecretDelete,
|
||||
UpdateFunc: c.handleSecretUpdate,
|
||||
})
|
||||
c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
c.ssetInfs.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleStatefulSetAdd,
|
||||
DeleteFunc: c.handleStatefulSetDelete,
|
||||
UpdateFunc: c.handleStatefulSetUpdate,
|
||||
|
@ -507,14 +507,14 @@ func (c *Operator) Run(ctx context.Context) error {
|
|||
|
||||
go c.worker(ctx)
|
||||
|
||||
go c.promInf.Run(ctx.Done())
|
||||
go c.smonInf.Run(ctx.Done())
|
||||
go c.pmonInf.Run(ctx.Done())
|
||||
go c.probeInf.Run(ctx.Done())
|
||||
go c.ruleInf.Run(ctx.Done())
|
||||
go c.cmapInf.Run(ctx.Done())
|
||||
go c.secrInf.Run(ctx.Done())
|
||||
go c.ssetInf.Run(ctx.Done())
|
||||
go c.promInfs.Start(ctx.Done())
|
||||
go c.smonInfs.Start(ctx.Done())
|
||||
go c.pmonInfs.Start(ctx.Done())
|
||||
go c.probeInfs.Start(ctx.Done())
|
||||
go c.ruleInfs.Start(ctx.Done())
|
||||
go c.cmapInfs.Start(ctx.Done())
|
||||
go c.secrInfs.Start(ctx.Done())
|
||||
go c.ssetInfs.Start(ctx.Done())
|
||||
go c.nsMonInf.Run(ctx.Done())
|
||||
if c.nsPromInf != c.nsMonInf {
|
||||
go c.nsPromInf.Run(ctx.Done())
|
||||
|
@ -999,7 +999,9 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
|
|||
}
|
||||
ns := nsObject.(*v1.Namespace)
|
||||
|
||||
err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
|
||||
objs, err := c.promInfs.List(labels.Everything())
|
||||
|
||||
for _, obj := range objs {
|
||||
// Check for Prometheus instances in the namespace.
|
||||
p := obj.(*monitoringv1.Prometheus)
|
||||
if p.Namespace == nsName {
|
||||
|
@ -1068,7 +1070,8 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
|
|||
c.enqueue(p)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
level.Error(c.logger).Log(
|
||||
"msg", "listing all Prometheus instances from cache failed",
|
||||
|
@ -1113,14 +1116,17 @@ func (c *Operator) prometheusForStatefulSet(sset interface{}) *monitoringv1.Prom
|
|||
}
|
||||
|
||||
promKey := statefulSetKeyToPrometheusKey(key)
|
||||
p, exists, err := c.promInf.GetStore().GetByKey(promKey)
|
||||
|
||||
p, err := c.promInfs.Get(promKey)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
level.Error(c.logger).Log("msg", "Prometheus lookup failed", "err", err)
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return p.(*monitoringv1.Prometheus)
|
||||
}
|
||||
|
||||
|
@ -1182,16 +1188,17 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
|
|||
}
|
||||
|
||||
func (c *Operator) sync(ctx context.Context, key string) error {
|
||||
obj, exists, err := c.promInf.GetIndexer().GetByKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
pobj, err := c.promInfs.Get(key)
|
||||
|
||||
if apierrors.IsNotFound(err) {
|
||||
// Dependent resources are cleaned up by K8s via OwnerReferences
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p := obj.(*monitoringv1.Prometheus)
|
||||
p := pobj.(*monitoringv1.Prometheus)
|
||||
p = p.DeepCopy()
|
||||
p.APIVersion = monitoringv1.SchemeGroupVersion.String()
|
||||
p.Kind = monitoringv1.PrometheusesKind
|
||||
|
@ -1224,11 +1231,14 @@ func (c *Operator) sync(ctx context.Context, key string) error {
|
|||
|
||||
ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace)
|
||||
// Ensure we have a StatefulSet running Prometheus deployed.
|
||||
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))
|
||||
if err != nil {
|
||||
obj, err := c.ssetInfs.Get(prometheusKeyToStatefulSetKey(key))
|
||||
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return errors.Wrap(err, "retrieving statefulset failed")
|
||||
}
|
||||
|
||||
exists := !apierrors.IsNotFound(err)
|
||||
|
||||
spec := appsv1.StatefulSetSpec{}
|
||||
if obj != nil {
|
||||
ss := obj.(*appsv1.StatefulSet)
|
||||
|
@ -1642,7 +1652,7 @@ func (c *Operator) selectServiceMonitors(ctx context.Context, p *monitoringv1.Pr
|
|||
level.Debug(c.logger).Log("msg", "filtering namespaces to select ServiceMonitors from", "namespaces", strings.Join(namespaces, ","), "namespace", p.Namespace, "prometheus", p.Name)
|
||||
|
||||
for _, ns := range namespaces {
|
||||
cache.ListAllByNamespace(c.smonInf.GetIndexer(), ns, servMonSelector, func(obj interface{}) {
|
||||
c.smonInfs.ListAllByNamespace(ns, servMonSelector, func(obj interface{}) {
|
||||
k, ok := c.keyFunc(obj)
|
||||
if ok {
|
||||
serviceMonitors[k] = obj.(*monitoringv1.ServiceMonitor)
|
||||
|
@ -1730,7 +1740,7 @@ func (c *Operator) selectPodMonitors(p *monitoringv1.Prometheus) (map[string]*mo
|
|||
|
||||
podMonitors := []string{}
|
||||
for _, ns := range namespaces {
|
||||
cache.ListAllByNamespace(c.pmonInf.GetIndexer(), ns, podMonSelector, func(obj interface{}) {
|
||||
c.pmonInfs.ListAllByNamespace(ns, podMonSelector, func(obj interface{}) {
|
||||
k, ok := c.keyFunc(obj)
|
||||
if ok {
|
||||
res[k] = obj.(*monitoringv1.PodMonitor)
|
||||
|
@ -1773,7 +1783,7 @@ func (c *Operator) selectProbes(p *monitoringv1.Prometheus) (map[string]*monitor
|
|||
|
||||
probes := make([]string, 0)
|
||||
for _, ns := range namespaces {
|
||||
cache.ListAllByNamespace(c.probeInf.GetIndexer(), ns, bMonSelector, func(obj interface{}) {
|
||||
c.probeInfs.ListAllByNamespace(ns, bMonSelector, func(obj interface{}) {
|
||||
if k, ok := c.keyFunc(obj); ok {
|
||||
res[k] = obj.(*monitoringv1.Probe)
|
||||
probes = append(probes, k)
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
|
@ -179,7 +178,7 @@ func (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string)
|
|||
|
||||
for _, ns := range namespaces {
|
||||
var marshalErr error
|
||||
err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {
|
||||
err := c.ruleInfs.ListAllByNamespace(ns, ruleSelector, func(obj interface{}) {
|
||||
promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
|
||||
|
||||
if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue