1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

pkg/informers: initial commit

This commit is contained in:
Sergiusz Urbaniak 2020-08-25 14:47:02 +02:00
parent 4b45d7d46b
commit 54bbe620bb
3 changed files with 288 additions and 0 deletions

180
pkg/informers/informers.go Normal file
View file

@ -0,0 +1,180 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"strings"
"time"
"github.com/prometheus-operator/prometheus-operator/pkg/listwatch"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
)
type GenericInformer interface {
Informer() cache.SharedIndexInformer
Lister() cache.GenericLister
}
// InformerFactoriesForNamespaces is a simple way to combine several shared informers into a single struct with unified listing power
type InformerFactoriesForNamespaces interface {
ForResource(namespace string, resource schema.GroupVersionResource) (GenericInformer, error)
Namespaces() sets.String
}
type InformersForResource struct {
informers []GenericInformer
}
func NewInformersForResource(ifs InformerFactoriesForNamespaces, resource schema.GroupVersionResource) (*InformersForResource, error) {
namespaces := ifs.Namespaces().List()
var informers []GenericInformer
for _, ns := range namespaces {
informer, err := ifs.ForResource(ns, resource)
if err != nil {
return nil, errors.Wrapf(err, "error getting informer for resource %v", resource)
}
informers = append(informers, informer)
}
return &InformersForResource{
informers: informers,
}, nil
}
func (w *InformersForResource) Start(stopCh <-chan struct{}) {
for _, i := range w.informers {
go i.Informer().Run(stopCh)
}
}
func (w *InformersForResource) GetInformers() []GenericInformer {
return w.informers
}
func (w *InformersForResource) AddEventHandler(handler cache.ResourceEventHandler) {
for _, i := range w.informers {
i.Informer().AddEventHandler(handler)
}
}
func (w *InformersForResource) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
for _, i := range w.informers {
i.Informer().AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}
}
func (w *InformersForResource) HasSynced() bool {
for _, i := range w.informers {
if !i.Informer().HasSynced() {
return false
}
}
return true
}
func (w *InformersForResource) List(selector labels.Selector) ([]runtime.Object, error) {
var ret []runtime.Object
for _, inf := range w.informers {
objs, err := inf.Lister().List(selector)
if err != nil {
return nil, err
}
ret = append(ret, objs...)
}
return ret, nil
}
func (w *InformersForResource) ListAllByNamespace(namespace string, selector labels.Selector, appendFn cache.AppendFunc) error {
for _, inf := range w.informers {
err := cache.ListAllByNamespace(inf.Informer().GetIndexer(), namespace, selector, appendFn)
if err != nil {
return err
}
}
return nil
}
func (w *InformersForResource) Get(name string) (runtime.Object, error) {
var err error
for _, inf := range w.informers {
var ret runtime.Object
ret, err = inf.Lister().Get(name)
if apierrors.IsNotFound(err) {
continue
}
if err != nil {
return nil, err
}
return ret, nil
}
return nil, err
}
func newInformerOptions(allowedNamespaces, deniedNamespaces map[string]struct{}, tweaks func(*v1.ListOptions)) (func(*v1.ListOptions), []string) {
if tweaks == nil {
tweaks = func(*v1.ListOptions) {} // nop
}
var namespaces []string
if listwatch.IsAllNamespaces(allowedNamespaces) {
namespaces = append(namespaces, v1.NamespaceAll)
return func(options *v1.ListOptions) {
tweaks(options)
denyListTweak(options, deniedNamespaces)
}, namespaces
}
for ns, _ := range allowedNamespaces {
namespaces = append(namespaces, ns)
}
return tweaks, namespaces
}
func denyListTweak(options *metav1.ListOptions, namespaces map[string]struct{}) {
if len(namespaces) == 0 {
return
}
var denied []string
for ns, _ := range namespaces {
denied = append(denied, "metadata.namespace!="+ns)
}
options.FieldSelector = strings.Join(denied, ",")
}

54
pkg/informers/kube.go Normal file
View file

@ -0,0 +1,54 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
func NewKubeInformerFactories(
allowNamespaces, denyNamespaces map[string]struct{},
kubeClient kubernetes.Interface, defaultResync time.Duration, tweakListOptions func(*metav1.ListOptions),
) InformerFactoriesForNamespaces {
tweaks, namespaces := newInformerOptions(
allowNamespaces, denyNamespaces, tweakListOptions,
)
opts := []informers.SharedInformerOption{informers.WithTweakListOptions(tweaks)}
ret := kubeInformersForNamespaces{}
for _, namespace := range namespaces {
opts = append(opts, informers.WithNamespace(namespace))
ret[namespace] = informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResync, opts...)
}
return ret
}
type kubeInformersForNamespaces map[string]informers.SharedInformerFactory
func (i kubeInformersForNamespaces) Namespaces() sets.String {
return sets.StringKeySet(i)
}
func (i kubeInformersForNamespaces) ForResource(namespace string, resource schema.GroupVersionResource) (GenericInformer, error) {
return i[namespace].ForResource(resource)
}

View file

@ -0,0 +1,54 @@
// Copyright 2020 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"time"
informers "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
)
func NewMonitoringInformerFactories(
allowNamespaces, denyNamespaces map[string]struct{},
monitoringClient monitoring.Interface, defaultResync time.Duration, tweakListOptions func(*metav1.ListOptions),
) InformerFactoriesForNamespaces {
tweaks, namespaces := newInformerOptions(
allowNamespaces, denyNamespaces, tweakListOptions,
)
opts := []informers.SharedInformerOption{informers.WithTweakListOptions(tweaks)}
ret := monitoringInformersForNamespaces{}
for _, namespace := range namespaces {
opts = append(opts, informers.WithNamespace(namespace))
ret[namespace] = informers.NewSharedInformerFactoryWithOptions(monitoringClient, defaultResync, opts...)
}
return ret
}
type monitoringInformersForNamespaces map[string]informers.SharedInformerFactory
func (i monitoringInformersForNamespaces) Namespaces() sets.String {
return sets.StringKeySet(i)
}
func (i monitoringInformersForNamespaces) ForResource(namespace string, resource schema.GroupVersionResource) (GenericInformer, error) {
return i[namespace].ForResource(resource)
}