1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

*: pass context.Context to client-go functions

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2020-08-31 16:49:38 +02:00
parent d1e9fc77e2
commit 053da63f0b
12 changed files with 188 additions and 181 deletions

View file

@ -244,22 +244,28 @@ func Main() int {
cfg.Namespaces.ThanosRulerAllowList = cfg.Namespaces.AllowList
}
ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)
r := prometheus.NewRegistry()
po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r)
po, err := prometheuscontroller.New(ctx, cfg, log.With(logger, "component", "prometheusoperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating prometheus controller failed: ", err)
cancel()
return 1
}
ao, err := alertmanagercontroller.New(cfg, log.With(logger, "component", "alertmanageroperator"), r)
ao, err := alertmanagercontroller.New(ctx, cfg, log.With(logger, "component", "alertmanageroperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating alertmanager controller failed: ", err)
cancel()
return 1
}
to, err := thanoscontroller.New(cfg, log.With(logger, "component", "thanosoperator"), r)
to, err := thanoscontroller.New(ctx, cfg, log.With(logger, "component", "thanosoperator"), r)
if err != nil {
fmt.Fprint(os.Stderr, "instantiating thanos controller failed: ", err)
cancel()
return 1
}
@ -267,6 +273,7 @@ func Main() int {
web, err := api.New(cfg, log.With(logger, "component", "api"))
if err != nil {
fmt.Fprint(os.Stderr, "instantiating api failed: ", err)
cancel()
return 1
}
admit := admission.New(log.With(logger, "component", "admissionwebhook"))
@ -276,6 +283,7 @@ func Main() int {
l, err := net.Listen("tcp", cfg.ListenAddress)
if err != nil {
fmt.Fprint(os.Stderr, "listening failed", cfg.ListenAddress, err)
cancel()
return 1
}
@ -288,6 +296,7 @@ func Main() int {
cfg.ServerTLSConfig.ClientCAFile, cfg.ServerTLSConfig.MinVersion, cfg.ServerTLSConfig.CipherSuites)
if tlsConfig == nil || err != nil {
fmt.Fprint(os.Stderr, "invalid TLS config", err)
cancel()
return 1
}
}
@ -321,12 +330,9 @@ func Main() int {
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error { return po.Run(ctx.Done()) })
wg.Go(func() error { return ao.Run(ctx.Done()) })
wg.Go(func() error { return to.Run(ctx.Done()) })
wg.Go(func() error { return po.Run(ctx) })
wg.Go(func() error { return ao.Run(ctx) })
wg.Go(func() error { return to.Run(ctx) })
if tlsConfig != nil {
r, err := rbacproxytls.NewCertReloader(
@ -336,6 +342,7 @@ func Main() int {
)
if err != nil {
fmt.Fprint(os.Stderr, "failed to initialize certificate reloader", err)
cancel()
return 1
}

View file

@ -81,7 +81,7 @@ type Config struct {
}
// New creates a new controller.
func New(c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
func New(ctx context.Context, c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
@ -123,11 +123,11 @@ func New(c prometheusoperator.Config, logger log.Logger, r prometheus.Registerer
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(context.TODO(), options)
return o.mclient.MonitoringV1().Alertmanagers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.AlertManagerSelector
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(context.TODO(), options)
return o.mclient.MonitoringV1().Alertmanagers(namespace).Watch(ctx, options)
},
}
}),
@ -189,7 +189,7 @@ func (c *Operator) addHandlers() {
}
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
func (c *Operator) Run(ctx context.Context) error {
defer c.queue.ShutDown()
errChan := make(chan error)
@ -209,20 +209,20 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
return err
}
level.Info(c.logger).Log("msg", "CRD API endpoints ready")
case <-stopc:
case <-ctx.Done():
return nil
}
go c.worker()
go c.worker(ctx)
go c.alrtInf.Run(stopc)
go c.ssetInf.Run(stopc)
if err := c.waitForCacheSync(stopc); err != nil {
go c.alrtInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
if err := c.waitForCacheSync(ctx.Done()); err != nil {
return err
}
c.addHandlers()
<-stopc
<-ctx.Done()
return nil
}
@ -270,12 +270,12 @@ func (c *Operator) enqueue(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them
// and marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (c *Operator) worker() {
for c.processNextWorkItem() {
func (c *Operator) worker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Operator) processNextWorkItem() bool {
func (c *Operator) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
@ -283,7 +283,7 @@ func (c *Operator) processNextWorkItem() bool {
defer c.queue.Done(key)
c.metrics.ReconcileCounter().Inc()
err := c.sync(key.(string))
err := c.sync(ctx, key.(string))
if err == nil {
c.queue.Forget(key)
return true
@ -397,7 +397,7 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
}
func (c *Operator) sync(key string) error {
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.alrtInf.GetIndexer().GetByKey(key)
if err != nil {
return err
@ -420,7 +420,7 @@ func (c *Operator) sync(key string) error {
// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(am.Namespace)
if err = k8sutil.CreateOrUpdateService(svcClient, makeStatefulSetService(am, c.config)); err != nil {
if err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(am, c.config)); err != nil {
return errors.Wrap(err, "synchronizing governing service failed")
}
@ -437,7 +437,7 @@ func (c *Operator) sync(key string) error {
return errors.Wrap(err, "making the statefulset, to create, failed")
}
operator.SanitizeSTS(sset)
if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {
if _, err := ssetClient.Create(ctx, sset, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "creating statefulset failed")
}
return nil
@ -449,14 +449,14 @@ func (c *Operator) sync(key string) error {
}
operator.SanitizeSTS(sset)
_, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})
_, err = ssetClient.Update(ctx, sset, metav1.UpdateOptions{})
sErr, ok := err.(*apierrors.StatusError)
if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {
c.metrics.StsDeleteCreateCounter().Inc()
level.Info(c.logger).Log("msg", "resolving illegal update of Alertmanager StatefulSet", "details", sErr.ErrStatus.Details)
propagationPolicy := metav1.DeletePropagationForeground
if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
if err := ssetClient.Delete(ctx, sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
}
return nil
@ -492,14 +492,14 @@ func ListOptions(name string) metav1.ListOptions {
}
}
func AlertmanagerStatus(kclient kubernetes.Interface, a *monitoringv1.Alertmanager) (*monitoringv1.AlertmanagerStatus, []v1.Pod, error) {
func AlertmanagerStatus(ctx context.Context, kclient kubernetes.Interface, a *monitoringv1.Alertmanager) (*monitoringv1.AlertmanagerStatus, []v1.Pod, error) {
res := &monitoringv1.AlertmanagerStatus{Paused: a.Spec.Paused}
pods, err := kclient.CoreV1().Pods(a.Namespace).List(context.TODO(), ListOptions(a.Name))
pods, err := kclient.CoreV1().Pods(a.Namespace).List(ctx, ListOptions(a.Name))
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving pods of failed")
}
sset, err := kclient.AppsV1().StatefulSets(a.Namespace).Get(context.TODO(), statefulSetNameFromAlertmanagerName(a.Name), metav1.GetOptions{})
sset, err := kclient.AppsV1().StatefulSets(a.Namespace).Get(ctx, statefulSetNameFromAlertmanagerName(a.Name), metav1.GetOptions{})
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving stateful set failed")
}

View file

@ -15,7 +15,6 @@
package api
import (
"context"
"encoding/json"
"net/http"
"regexp"
@ -100,7 +99,7 @@ func parsePrometheusStatusUrl(path string) objectReference {
func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
or := parsePrometheusStatusUrl(req.URL.Path)
p, err := api.mclient.MonitoringV1().Prometheuses(or.namespace).Get(context.TODO(), or.name, metav1.GetOptions{})
p, err := api.mclient.MonitoringV1().Prometheuses(or.namespace).Get(req.Context(), or.name, metav1.GetOptions{})
if err != nil {
if k8sutil.IsResourceNotFoundError(err) {
w.WriteHeader(404)
@ -109,7 +108,7 @@ func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
return
}
p.Status, _, err = prometheus.PrometheusStatus(api.kclient, p)
p.Status, _, err = prometheus.PrometheusStatus(req.Context(), api.kclient, p)
if err != nil {
api.logger.Log("error", err)
}

View file

@ -105,21 +105,21 @@ func IsResourceNotFoundError(err error) bool {
return false
}
func CreateOrUpdateService(sclient clientv1.ServiceInterface, svc *v1.Service) error {
service, err := sclient.Get(context.TODO(), svc.Name, metav1.GetOptions{})
func CreateOrUpdateService(ctx context.Context, sclient clientv1.ServiceInterface, svc *v1.Service) error {
service, err := sclient.Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving service object failed")
}
if apierrors.IsNotFound(err) {
_, err = sclient.Create(context.TODO(), svc, metav1.CreateOptions{})
_, err = sclient.Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
return errors.Wrap(err, "creating service object failed")
}
} else {
svc.ResourceVersion = service.ResourceVersion
svc.SetOwnerReferences(mergeOwnerReferences(service.GetOwnerReferences(), svc.GetOwnerReferences()))
_, err := sclient.Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := sclient.Update(ctx, svc, metav1.UpdateOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "updating service object failed")
}
@ -128,20 +128,20 @@ func CreateOrUpdateService(sclient clientv1.ServiceInterface, svc *v1.Service) e
return nil
}
func CreateOrUpdateEndpoints(eclient clientv1.EndpointsInterface, eps *v1.Endpoints) error {
endpoints, err := eclient.Get(context.TODO(), eps.Name, metav1.GetOptions{})
func CreateOrUpdateEndpoints(ctx context.Context, eclient clientv1.EndpointsInterface, eps *v1.Endpoints) error {
endpoints, err := eclient.Get(ctx, eps.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "retrieving existing kubelet endpoints object failed")
}
if apierrors.IsNotFound(err) {
_, err = eclient.Create(context.TODO(), eps, metav1.CreateOptions{})
_, err = eclient.Create(ctx, eps, metav1.CreateOptions{})
if err != nil {
return errors.Wrap(err, "creating kubelet endpoints object failed")
}
} else {
eps.ResourceVersion = endpoints.ResourceVersion
_, err = eclient.Update(context.TODO(), eps, metav1.UpdateOptions{})
_, err = eclient.Update(ctx, eps, metav1.UpdateOptions{})
if err != nil {
return errors.Wrap(err, "updating kubelet endpoints object failed")
}

View file

@ -108,7 +108,7 @@ func assetKeyFunc(obj interface{}) (string, error) {
}
// addTLSConfig processes the given *TLSConfig and adds the referenced CA, certifcate and key to the store.
func (a *assetStore) addTLSConfig(ns string, tlsConfig *monitoringv1.TLSConfig) error {
func (a *assetStore) addTLSConfig(ctx context.Context, ns string, tlsConfig *monitoringv1.TLSConfig) error {
if tlsConfig == nil {
return nil
}
@ -121,10 +121,10 @@ func (a *assetStore) addTLSConfig(ns string, tlsConfig *monitoringv1.TLSConfig)
switch {
case tlsConfig.CA.Secret != nil:
ca, err = a.getSecretKey(ns, *tlsConfig.CA.Secret)
ca, err = a.getSecretKey(ctx, ns, *tlsConfig.CA.Secret)
case tlsConfig.CA.ConfigMap != nil:
ca, err = a.getConfigMapKey(ns, *tlsConfig.CA.ConfigMap)
ca, err = a.getConfigMapKey(ctx, ns, *tlsConfig.CA.ConfigMap)
}
if err != nil {
@ -142,10 +142,10 @@ func (a *assetStore) addTLSConfig(ns string, tlsConfig *monitoringv1.TLSConfig)
switch {
case tlsConfig.Cert.Secret != nil:
cert, err = a.getSecretKey(ns, *tlsConfig.Cert.Secret)
cert, err = a.getSecretKey(ctx, ns, *tlsConfig.Cert.Secret)
case tlsConfig.Cert.ConfigMap != nil:
cert, err = a.getConfigMapKey(ns, *tlsConfig.Cert.ConfigMap)
cert, err = a.getConfigMapKey(ctx, ns, *tlsConfig.Cert.ConfigMap)
}
if err != nil {
@ -156,7 +156,7 @@ func (a *assetStore) addTLSConfig(ns string, tlsConfig *monitoringv1.TLSConfig)
}
if tlsConfig.KeySecret != nil {
key, err := a.getSecretKey(ns, *tlsConfig.KeySecret)
key, err := a.getSecretKey(ctx, ns, *tlsConfig.KeySecret)
if err != nil {
return errors.Wrap(err, "failed to get key")
}
@ -167,17 +167,17 @@ func (a *assetStore) addTLSConfig(ns string, tlsConfig *monitoringv1.TLSConfig)
}
// addTLSConfig processes the given *BasicAuth and adds the referenced credentials to the store.
func (a *assetStore) addBasicAuth(ns string, ba *monitoringv1.BasicAuth, key string) error {
func (a *assetStore) addBasicAuth(ctx context.Context, ns string, ba *monitoringv1.BasicAuth, key string) error {
if ba == nil {
return nil
}
username, err := a.getSecretKey(ns, ba.Username)
username, err := a.getSecretKey(ctx, ns, ba.Username)
if err != nil {
return errors.Wrap(err, "failed to get basic auth username")
}
password, err := a.getSecretKey(ns, ba.Password)
password, err := a.getSecretKey(ctx, ns, ba.Password)
if err != nil {
return errors.Wrap(err, "failed to get basic auth password")
}
@ -191,12 +191,12 @@ func (a *assetStore) addBasicAuth(ns string, ba *monitoringv1.BasicAuth, key str
}
// addTLSConfig processes the given SecretKeySelector and adds the referenced data to the store.
func (a *assetStore) addBearerToken(ns string, sel v1.SecretKeySelector, key string) error {
func (a *assetStore) addBearerToken(ctx context.Context, ns string, sel v1.SecretKeySelector, key string) error {
if sel.Name == "" {
return nil
}
bearerToken, err := a.getSecretKey(ns, sel)
bearerToken, err := a.getSecretKey(ctx, ns, sel)
if err != nil {
return errors.Wrap(err, "failed to get bearer token")
}
@ -206,7 +206,7 @@ func (a *assetStore) addBearerToken(ns string, sel v1.SecretKeySelector, key str
return nil
}
func (a *assetStore) getConfigMapKey(namespace string, sel v1.ConfigMapKeySelector) (string, error) {
func (a *assetStore) getConfigMapKey(ctx context.Context, namespace string, sel v1.ConfigMapKeySelector) (string, error) {
obj, exists, err := a.objStore.Get(&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: sel.Name,
@ -218,7 +218,7 @@ func (a *assetStore) getConfigMapKey(namespace string, sel v1.ConfigMapKeySelect
}
if !exists {
cm, err := a.cmClient.ConfigMaps(namespace).Get(context.TODO(), sel.Name, metav1.GetOptions{})
cm, err := a.cmClient.ConfigMaps(namespace).Get(ctx, sel.Name, metav1.GetOptions{})
if err != nil {
return "", errors.Wrapf(err, "unable to get configmap %q", sel.Name)
}
@ -236,7 +236,7 @@ func (a *assetStore) getConfigMapKey(namespace string, sel v1.ConfigMapKeySelect
return cm.Data[sel.Key], nil
}
func (a *assetStore) getSecretKey(namespace string, sel v1.SecretKeySelector) (string, error) {
func (a *assetStore) getSecretKey(ctx context.Context, namespace string, sel v1.SecretKeySelector) (string, error) {
obj, exists, err := a.objStore.Get(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: sel.Name,
@ -248,7 +248,7 @@ func (a *assetStore) getSecretKey(namespace string, sel v1.SecretKeySelector) (s
}
if !exists {
secret, err := a.sClient.Secrets(namespace).Get(context.TODO(), sel.Name, metav1.GetOptions{})
secret, err := a.sClient.Secrets(namespace).Get(ctx, sel.Name, metav1.GetOptions{})
if err != nil {
return "", errors.Wrapf(err, "unable to get secret %q", sel.Name)
}

View file

@ -15,6 +15,7 @@
package prometheus
import (
"context"
"fmt"
"testing"
@ -89,7 +90,7 @@ func TestAddBearerToken(t *testing.T) {
}
key := fmt.Sprintf("basicauth/%d", i)
err := store.addBearerToken(tc.ns, sel, key)
err := store.addBearerToken(context.Background(), tc.ns, sel, key)
if tc.err {
if err == nil {
@ -220,7 +221,7 @@ func TestAddBasicAuth(t *testing.T) {
}
key := fmt.Sprintf("basicauth/%d", i)
err := store.addBasicAuth(tc.ns, basicAuth, key)
err := store.addBasicAuth(context.Background(), tc.ns, basicAuth, key)
if tc.err {
if err == nil {
@ -564,7 +565,7 @@ func TestAddTLSConfig(t *testing.T) {
t.Run("", func(t *testing.T) {
store := newAssetStore(c.CoreV1(), c.CoreV1())
err := store.addTLSConfig(tc.ns, tc.tlsConfig)
err := store.addTLSConfig(context.Background(), tc.ns, tc.tlsConfig)
if tc.err {
if err == nil {

View file

@ -178,7 +178,7 @@ type BearerToken string
type TLSAsset string
// New creates a new controller.
func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
func New(ctx context.Context, conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(conf.Host, conf.TLSInsecure, &conf.TLSConfig)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
@ -254,11 +254,11 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().Prometheuses(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = c.config.PromSelector
return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().Prometheuses(namespace).Watch(ctx, options)
},
}
}),
@ -273,10 +273,10 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().ServiceMonitors(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().ServiceMonitors(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().ServiceMonitors(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().ServiceMonitors(namespace).Watch(ctx, options)
},
}
}),
@ -290,10 +290,10 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PodMonitors(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().PodMonitors(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PodMonitors(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().PodMonitors(namespace).Watch(ctx, options)
},
}
}),
@ -307,10 +307,10 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) {
return mclient.MonitoringV1().Probes(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().Probes(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (w watch.Interface, err error) {
return mclient.MonitoringV1().Probes(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().Probes(namespace).Watch(ctx, options)
},
}
}),
@ -324,10 +324,10 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
},
}
}),
@ -342,11 +342,11 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelPrometheusName
return c.kclient.CoreV1().ConfigMaps(namespace).List(context.TODO(), options)
return c.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelPrometheusName
return c.kclient.CoreV1().ConfigMaps(namespace).Watch(context.TODO(), options)
return c.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
},
}
}),
@ -404,7 +404,7 @@ func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, er
}
// waitForCacheSync waits for the informers' caches to be synced.
func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
func (c *Operator) waitForCacheSync(ctx context.Context) error {
ok := true
informers := []struct {
name string
@ -422,7 +422,7 @@ func (c *Operator) waitForCacheSync(stopc <-chan struct{}) error {
{"MonNamespace", c.nsMonInf},
}
for _, inf := range informers {
if !cache.WaitForCacheSync(stopc, inf.informer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), inf.informer.HasSynced) {
level.Error(c.logger).Log("msg", fmt.Sprintf("failed to sync %s cache", inf.name))
ok = false
} else {
@ -481,7 +481,7 @@ func (c *Operator) addHandlers() {
}
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
func (c *Operator) Run(ctx context.Context) error {
defer c.queue.ShutDown()
errChan := make(chan error)
@ -501,34 +501,34 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
return err
}
level.Info(c.logger).Log("msg", "CRD API endpoints ready")
case <-stopc:
case <-ctx.Done():
return nil
}
go c.worker()
go c.worker(ctx)
go c.promInf.Run(stopc)
go c.smonInf.Run(stopc)
go c.pmonInf.Run(stopc)
go c.probeInf.Run(stopc)
go c.ruleInf.Run(stopc)
go c.cmapInf.Run(stopc)
go c.secrInf.Run(stopc)
go c.ssetInf.Run(stopc)
go c.nsMonInf.Run(stopc)
go c.promInf.Run(ctx.Done())
go c.smonInf.Run(ctx.Done())
go c.pmonInf.Run(ctx.Done())
go c.probeInf.Run(ctx.Done())
go c.ruleInf.Run(ctx.Done())
go c.cmapInf.Run(ctx.Done())
go c.secrInf.Run(ctx.Done())
go c.ssetInf.Run(ctx.Done())
go c.nsMonInf.Run(ctx.Done())
if c.nsPromInf != c.nsMonInf {
go c.nsPromInf.Run(stopc)
go c.nsPromInf.Run(ctx.Done())
}
if err := c.waitForCacheSync(stopc); err != nil {
if err := c.waitForCacheSync(ctx); err != nil {
return err
}
c.addHandlers()
if c.kubeletSyncEnabled {
go c.reconcileNodeEndpoints(stopc)
go c.reconcileNodeEndpoints(ctx)
}
<-stopc
<-ctx.Done()
return nil
}
@ -580,16 +580,16 @@ func (c *Operator) handlePrometheusUpdate(old, cur interface{}) {
c.enqueue(key)
}
func (c *Operator) reconcileNodeEndpoints(stopc <-chan struct{}) {
c.syncNodeEndpointsWithLogError()
func (c *Operator) reconcileNodeEndpoints(ctx context.Context) {
c.syncNodeEndpointsWithLogError(ctx)
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-stopc:
case <-ctx.Done():
return
case <-ticker.C:
c.syncNodeEndpointsWithLogError()
c.syncNodeEndpointsWithLogError(ctx)
}
}
}
@ -638,16 +638,16 @@ func getNodeAddresses(nodes *v1.NodeList) ([]v1.EndpointAddress, []error) {
return addresses, errs
}
func (c *Operator) syncNodeEndpointsWithLogError() {
func (c *Operator) syncNodeEndpointsWithLogError(ctx context.Context) {
c.nodeEndpointSyncs.Inc()
err := c.syncNodeEndpoints()
err := c.syncNodeEndpoints(ctx)
if err != nil {
c.nodeEndpointSyncErrors.Inc()
level.Error(c.logger).Log("msg", "syncing nodes into Endpoints object failed", "err", err)
}
}
func (c *Operator) syncNodeEndpoints() error {
func (c *Operator) syncNodeEndpoints(ctx context.Context) error {
eps := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.kubeletObjectName,
@ -675,7 +675,7 @@ func (c *Operator) syncNodeEndpoints() error {
},
}
nodes, err := c.kclient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "listing nodes failed")
}
@ -716,12 +716,12 @@ func (c *Operator) syncNodeEndpoints() error {
},
}
err = k8sutil.CreateOrUpdateService(c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
err = k8sutil.CreateOrUpdateService(ctx, c.kclient.CoreV1().Services(c.kubeletObjectNamespace), svc)
if err != nil {
return errors.Wrap(err, "synchronizing kubelet service object failed")
}
err = k8sutil.CreateOrUpdateEndpoints(c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
err = k8sutil.CreateOrUpdateEndpoints(ctx, c.kclient.CoreV1().Endpoints(c.kubeletObjectNamespace), eps)
if err != nil {
return errors.Wrap(err, "synchronizing kubelet endpoints object failed")
}
@ -1080,12 +1080,12 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (c *Operator) worker() {
for c.processNextWorkItem() {
func (c *Operator) worker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Operator) processNextWorkItem() bool {
func (c *Operator) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
@ -1093,7 +1093,7 @@ func (c *Operator) processNextWorkItem() bool {
defer c.queue.Done(key)
c.metrics.ReconcileCounter().Inc()
err := c.sync(key.(string))
err := c.sync(ctx, key.(string))
if err == nil {
c.queue.Forget(key)
return true
@ -1181,7 +1181,7 @@ func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
}
}
func (c *Operator) sync(key string) error {
func (c *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := c.promInf.GetIndexer().GetByKey(key)
if err != nil {
return err
@ -1201,24 +1201,24 @@ func (c *Operator) sync(key string) error {
}
level.Info(c.logger).Log("msg", "sync prometheus", "key", key)
ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(p)
ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(ctx, p)
if err != nil {
return err
}
assetStore := newAssetStore(c.kclient.CoreV1(), c.kclient.CoreV1())
if err := c.createOrUpdateConfigurationSecret(p, ruleConfigMapNames, assetStore); err != nil {
if err := c.createOrUpdateConfigurationSecret(ctx, p, ruleConfigMapNames, assetStore); err != nil {
return errors.Wrap(err, "creating config failed")
}
if err := c.createOrUpdateTLSAssetSecret(p, assetStore); err != nil {
if err := c.createOrUpdateTLSAssetSecret(ctx, p, assetStore); err != nil {
return errors.Wrap(err, "creating tls asset secret failed")
}
// Create governing service if it doesn't exist.
svcClient := c.kclient.CoreV1().Services(p.Namespace)
if err := k8sutil.CreateOrUpdateService(svcClient, makeStatefulSetService(p, c.config)); err != nil {
if err := k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(p, c.config)); err != nil {
return errors.Wrap(err, "synchronizing governing service failed")
}
@ -1248,7 +1248,7 @@ func (c *Operator) sync(key string) error {
if !exists {
level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found")
level.Debug(c.logger).Log("msg", "creating Prometheus statefulset")
if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {
if _, err := ssetClient.Create(ctx, sset, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "creating statefulset failed")
}
return nil
@ -1262,14 +1262,14 @@ func (c *Operator) sync(key string) error {
level.Debug(c.logger).Log("msg", "updating current Prometheus statefulset")
_, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})
_, err = ssetClient.Update(ctx, sset, metav1.UpdateOptions{})
sErr, ok := err.(*apierrors.StatusError)
if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {
c.metrics.StsDeleteCreateCounter().Inc()
level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details)
propagationPolicy := metav1.DeletePropagationForeground
if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
if err := ssetClient.Delete(ctx, sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
}
return nil
@ -1343,14 +1343,14 @@ func ListOptions(name string) metav1.ListOptions {
// PrometheusStatus evaluates the current status of a Prometheus deployment with
// respect to its specified resource object. It return the status and a list of
// pods that are not updated.
func PrometheusStatus(kclient kubernetes.Interface, p *monitoringv1.Prometheus) (*monitoringv1.PrometheusStatus, []v1.Pod, error) {
func PrometheusStatus(ctx context.Context, kclient kubernetes.Interface, p *monitoringv1.Prometheus) (*monitoringv1.PrometheusStatus, []v1.Pod, error) {
res := &monitoringv1.PrometheusStatus{Paused: p.Spec.Paused}
pods, err := kclient.CoreV1().Pods(p.Namespace).List(context.TODO(), ListOptions(p.Name))
pods, err := kclient.CoreV1().Pods(p.Namespace).List(ctx, ListOptions(p.Name))
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving pods of failed")
}
sset, err := kclient.AppsV1().StatefulSets(p.Namespace).Get(context.TODO(), statefulSetNameFromPrometheusName(p.Name), metav1.GetOptions{})
sset, err := kclient.AppsV1().StatefulSets(p.Namespace).Get(ctx, statefulSetNameFromPrometheusName(p.Name), metav1.GetOptions{})
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving stateful set failed")
}
@ -1424,7 +1424,7 @@ func gzipConfig(buf *bytes.Buffer, conf []byte) error {
return nil
}
func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus, ruleConfigMapNames []string, store *assetStore) error {
func (c *Operator) createOrUpdateConfigurationSecret(ctx context.Context, p *monitoringv1.Prometheus, ruleConfigMapNames []string, store *assetStore) error {
// If no service or pod monitor selectors are configured, the user wants to
// manage configuration themselves. Do create an empty Secret if it doesn't
// exist.
@ -1437,9 +1437,9 @@ func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus,
return errors.Wrap(err, "generating empty config secret failed")
}
sClient := c.kclient.CoreV1().Secrets(p.Namespace)
_, err = sClient.Get(context.TODO(), s.Name, metav1.GetOptions{})
_, err = sClient.Get(ctx, s.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
if _, err := c.kclient.CoreV1().Secrets(p.Namespace).Create(context.TODO(), s, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
if _, err := c.kclient.CoreV1().Secrets(p.Namespace).Create(ctx, s, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrap(err, "creating empty config file failed")
}
}
@ -1450,7 +1450,7 @@ func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus,
return nil
}
smons, err := c.selectServiceMonitors(p, store)
smons, err := c.selectServiceMonitors(ctx, p, store)
if err != nil {
return errors.Wrap(err, "selecting ServiceMonitors failed")
}
@ -1466,25 +1466,25 @@ func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus,
}
sClient := c.kclient.CoreV1().Secrets(p.Namespace)
SecretsInPromNS, err := sClient.List(context.TODO(), metav1.ListOptions{})
SecretsInPromNS, err := sClient.List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for i, remote := range p.Spec.RemoteRead {
if err := store.addBasicAuth(p.GetNamespace(), remote.BasicAuth, fmt.Sprintf("remoteRead/%d", i)); err != nil {
if err := store.addBasicAuth(ctx, p.GetNamespace(), remote.BasicAuth, fmt.Sprintf("remoteRead/%d", i)); err != nil {
return errors.Wrapf(err, "remote read %d", i)
}
}
for i, remote := range p.Spec.RemoteWrite {
if err := store.addBasicAuth(p.GetNamespace(), remote.BasicAuth, fmt.Sprintf("remoteWrite/%d", i)); err != nil {
if err := store.addBasicAuth(ctx, p.GetNamespace(), remote.BasicAuth, fmt.Sprintf("remoteWrite/%d", i)); err != nil {
return errors.Wrapf(err, "remote write %d", i)
}
}
if p.Spec.APIServerConfig != nil {
if err := store.addBasicAuth(p.GetNamespace(), p.Spec.APIServerConfig.BasicAuth, "apiserver"); err != nil {
if err := store.addBasicAuth(ctx, p.GetNamespace(), p.Spec.APIServerConfig.BasicAuth, "apiserver"); err != nil {
return errors.Wrap(err, "apiserver config")
}
}
@ -1531,10 +1531,10 @@ func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus,
}
s.Data[configFilename] = buf.Bytes()
curSecret, err := sClient.Get(context.TODO(), s.Name, metav1.GetOptions{})
curSecret, err := sClient.Get(ctx, s.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
level.Debug(c.logger).Log("msg", "creating configuration")
_, err = sClient.Create(context.TODO(), s, metav1.CreateOptions{})
_, err = sClient.Create(ctx, s, metav1.CreateOptions{})
return err
}
@ -1553,11 +1553,11 @@ func (c *Operator) createOrUpdateConfigurationSecret(p *monitoringv1.Prometheus,
}
level.Debug(c.logger).Log("msg", "updating Prometheus configuration secret")
_, err = sClient.Update(context.TODO(), s, metav1.UpdateOptions{})
_, err = sClient.Update(ctx, s, metav1.UpdateOptions{})
return err
}
func (c *Operator) createOrUpdateTLSAssetSecret(p *monitoringv1.Prometheus, store *assetStore) error {
func (c *Operator) createOrUpdateTLSAssetSecret(ctx context.Context, p *monitoringv1.Prometheus, store *assetStore) error {
boolTrue := true
sClient := c.kclient.CoreV1().Secrets(p.Namespace)
@ -1580,7 +1580,7 @@ func (c *Operator) createOrUpdateTLSAssetSecret(p *monitoringv1.Prometheus, stor
}
for i, rw := range p.Spec.RemoteWrite {
if err := store.addTLSConfig(p.GetNamespace(), rw.TLSConfig); err != nil {
if err := store.addTLSConfig(ctx, p.GetNamespace(), rw.TLSConfig); err != nil {
return errors.Wrapf(err, "remote write %d", i)
}
}
@ -1589,7 +1589,7 @@ func (c *Operator) createOrUpdateTLSAssetSecret(p *monitoringv1.Prometheus, stor
tlsAssetsSecret.Data[key.String()] = []byte(asset)
}
_, err := sClient.Get(context.TODO(), tlsAssetsSecret.Name, metav1.GetOptions{})
_, err := sClient.Get(ctx, tlsAssetsSecret.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(
@ -1599,11 +1599,11 @@ func (c *Operator) createOrUpdateTLSAssetSecret(p *monitoringv1.Prometheus, stor
p.Namespace,
)
}
_, err = sClient.Create(context.TODO(), tlsAssetsSecret, metav1.CreateOptions{})
_, err = sClient.Create(ctx, tlsAssetsSecret, metav1.CreateOptions{})
level.Debug(c.logger).Log("msg", "created tlsAssetsSecret", "secretname", tlsAssetsSecret.Name)
} else {
_, err = sClient.Update(context.TODO(), tlsAssetsSecret, metav1.UpdateOptions{})
_, err = sClient.Update(ctx, tlsAssetsSecret, metav1.UpdateOptions{})
level.Debug(c.logger).Log("msg", "updated tlsAssetsSecret", "secretname", tlsAssetsSecret.Name)
}
@ -1614,7 +1614,7 @@ func (c *Operator) createOrUpdateTLSAssetSecret(p *monitoringv1.Prometheus, stor
return nil
}
func (c *Operator) selectServiceMonitors(p *monitoringv1.Prometheus, store *assetStore) (map[string]*monitoringv1.ServiceMonitor, error) {
func (c *Operator) selectServiceMonitors(ctx context.Context, p *monitoringv1.Prometheus, store *assetStore) (map[string]*monitoringv1.ServiceMonitor, error) {
namespaces := []string{}
// Selectors (<namespace>/<name>) might overlap. Deduplicate them along the keyFunc.
serviceMonitors := make(map[string]*monitoringv1.ServiceMonitor)
@ -1665,15 +1665,15 @@ func (c *Operator) selectServiceMonitors(p *monitoringv1.Prometheus, store *asse
smKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", sm.GetNamespace(), sm.GetName(), i)
if err = store.addBearerToken(sm.GetNamespace(), endpoint.BearerTokenSecret, smKey); err != nil {
if err = store.addBearerToken(ctx, sm.GetNamespace(), endpoint.BearerTokenSecret, smKey); err != nil {
break
}
if err = store.addBasicAuth(sm.GetNamespace(), endpoint.BasicAuth, smKey); err != nil {
if err = store.addBasicAuth(ctx, sm.GetNamespace(), endpoint.BasicAuth, smKey); err != nil {
break
}
if err = store.addTLSConfig(sm.GetNamespace(), endpoint.TLSConfig); err != nil {
if err = store.addTLSConfig(ctx, sm.GetNamespace(), endpoint.TLSConfig); err != nil {
break
}
}

View file

@ -42,7 +42,7 @@ const labelPrometheusName = "prometheus-name"
// large buffer.
var maxConfigMapDataSize = int(float64(v1.MaxSecretSize) * 0.5)
func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]string, error) {
func (c *Operator) createOrUpdateRuleConfigMaps(ctx context.Context, p *monitoringv1.Prometheus) ([]string, error) {
cClient := c.kclient.CoreV1().ConfigMaps(p.Namespace)
namespaces, err := c.selectRuleNamespaces(p)
@ -55,7 +55,7 @@ func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]s
return nil, err
}
currentConfigMapList, err := cClient.List(context.TODO(), prometheusRulesConfigMapSelector(p.Name))
currentConfigMapList, err := cClient.List(ctx, prometheusRulesConfigMapSelector(p.Name))
if err != nil {
return nil, err
}
@ -99,7 +99,7 @@ func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]s
"prometheus", p.Name,
)
for _, cm := range newConfigMaps {
_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
_, err = cClient.Create(ctx, &cm, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to create ConfigMap '%v'", cm.Name)
}
@ -110,7 +110,7 @@ func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]s
// Simply deleting old ConfigMaps and creating new ones for now. Could be
// replaced by logic that only deletes obsolete ConfigMaps in the future.
for _, cm := range currentConfigMaps {
err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{})
err := cClient.Delete(ctx, cm.Name, metav1.DeleteOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to delete current ConfigMap '%v'", cm.Name)
}
@ -122,7 +122,7 @@ func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]s
"prometheus", p.Name,
)
for _, cm := range newConfigMaps {
_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
_, err = cClient.Create(ctx, &cm, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to create new ConfigMap '%v'", cm.Name)
}

View file

@ -94,7 +94,7 @@ type Config struct {
}
// New creates a new controller.
func New(conf prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
func New(ctx context.Context, conf prometheusoperator.Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(conf.Host, conf.TLSInsecure, &conf.TLSConfig)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
@ -143,11 +143,11 @@ func New(conf prometheusoperator.Config, logger log.Logger, r prometheus.Registe
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).List(context.TODO(), options)
return o.kclient.CoreV1().ConfigMaps(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelThanosRulerName
return o.kclient.CoreV1().ConfigMaps(namespace).Watch(context.TODO(), options)
return o.kclient.CoreV1().ConfigMaps(namespace).Watch(ctx, options)
},
}
}),
@ -161,11 +161,11 @@ func New(conf prometheusoperator.Config, logger log.Logger, r prometheus.Registe
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).List(context.TODO(), options)
return o.mclient.MonitoringV1().ThanosRulers(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = o.config.ThanosRulerSelector
return o.mclient.MonitoringV1().ThanosRulers(namespace).Watch(context.TODO(), options)
return o.mclient.MonitoringV1().ThanosRulers(namespace).Watch(ctx, options)
},
}
}),
@ -181,10 +181,10 @@ func New(conf prometheusoperator.Config, logger log.Logger, r prometheus.Registe
listwatch.MultiNamespaceListerWatcher(o.logger, o.config.Namespaces.AllowList, o.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options)
return mclient.MonitoringV1().PrometheusRules(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options)
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(ctx, options)
},
}
}),
@ -285,7 +285,7 @@ func (o *Operator) addHandlers() {
}
// Run the controller.
func (o *Operator) Run(stopc <-chan struct{}) error {
func (o *Operator) Run(ctx context.Context) error {
defer o.queue.ShutDown()
errChan := make(chan error)
@ -305,26 +305,26 @@ func (o *Operator) Run(stopc <-chan struct{}) error {
return err
}
level.Info(o.logger).Log("msg", "CRD API endpoints ready")
case <-stopc:
case <-ctx.Done():
return nil
}
go o.worker()
go o.worker(ctx)
go o.thanosRulerInf.Run(stopc)
go o.cmapInf.Run(stopc)
go o.ruleInf.Run(stopc)
go o.nsRuleInf.Run(stopc)
go o.thanosRulerInf.Run(ctx.Done())
go o.cmapInf.Run(ctx.Done())
go o.ruleInf.Run(ctx.Done())
go o.nsRuleInf.Run(ctx.Done())
if o.nsRuleInf != o.nsThanosRulerInf {
go o.nsThanosRulerInf.Run(stopc)
go o.nsThanosRulerInf.Run(ctx.Done())
}
go o.ssetInf.Run(stopc)
if err := o.waitForCacheSync(stopc); err != nil {
go o.ssetInf.Run(ctx.Done())
if err := o.waitForCacheSync(ctx.Done()); err != nil {
return err
}
o.addHandlers()
<-stopc
<-ctx.Done()
return nil
}
@ -542,12 +542,12 @@ func (o *Operator) enqueue(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. It enforces that the syncHandler is never invoked
// concurrently with the same key.
func (o *Operator) worker() {
for o.processNextWorkItem() {
func (o *Operator) worker(ctx context.Context) {
for o.processNextWorkItem(ctx) {
}
}
func (o *Operator) processNextWorkItem() bool {
func (o *Operator) processNextWorkItem(ctx context.Context) bool {
key, quit := o.queue.Get()
if quit {
return false
@ -555,7 +555,7 @@ func (o *Operator) processNextWorkItem() bool {
defer o.queue.Done(key)
o.metrics.ReconcileCounter().Inc()
err := o.sync(key.(string))
err := o.sync(ctx, key.(string))
if err == nil {
o.queue.Forget(key)
return true
@ -568,7 +568,7 @@ func (o *Operator) processNextWorkItem() bool {
return true
}
func (o *Operator) sync(key string) error {
func (o *Operator) sync(ctx context.Context, key string) error {
obj, exists, err := o.thanosRulerInf.GetIndexer().GetByKey(key)
if err != nil {
return err
@ -589,14 +589,14 @@ func (o *Operator) sync(key string) error {
level.Info(o.logger).Log("msg", "sync thanos-ruler", "key", key)
ruleConfigMapNames, err := o.createOrUpdateRuleConfigMaps(tr)
ruleConfigMapNames, err := o.createOrUpdateRuleConfigMaps(ctx, tr)
if err != nil {
return err
}
// Create governing service if it doesn't exist.
svcClient := o.kclient.CoreV1().Services(tr.Namespace)
if err = k8sutil.CreateOrUpdateService(svcClient, makeStatefulSetService(tr, o.config)); err != nil {
if err = k8sutil.CreateOrUpdateService(ctx, svcClient, makeStatefulSetService(tr, o.config)); err != nil {
return errors.Wrap(err, "synchronizing governing service failed")
}
@ -613,7 +613,7 @@ func (o *Operator) sync(key string) error {
return errors.Wrap(err, "making thanos statefulset config failed")
}
operator.SanitizeSTS(sset)
if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil {
if _, err := ssetClient.Create(ctx, sset, metav1.CreateOptions{}); err != nil {
return errors.Wrap(err, "creating thanos statefulset failed")
}
return nil
@ -643,14 +643,14 @@ func (o *Operator) sync(key string) error {
return nil
}
_, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})
_, err = ssetClient.Update(ctx, sset, metav1.UpdateOptions{})
sErr, ok := err.(*apierrors.StatusError)
if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid {
o.metrics.StsDeleteCreateCounter().Inc()
level.Info(o.logger).Log("msg", "resolving illegal update of ThanosRuler StatefulSet", "details", sErr.ErrStatus.Details)
propagationPolicy := metav1.DeletePropagationForeground
if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
if err := ssetClient.Delete(ctx, sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action")
}
return nil
@ -708,14 +708,14 @@ func ListOptions(name string) metav1.ListOptions {
// ThanosRulerStatus evaluates the current status of a ThanosRuler deployment with
// respect to its specified resource object. It return the status and a list of
// pods that are not updated.
func ThanosRulerStatus(kclient kubernetes.Interface, tr *monitoringv1.ThanosRuler) (*monitoringv1.ThanosRulerStatus, []v1.Pod, error) {
func ThanosRulerStatus(ctx context.Context, kclient kubernetes.Interface, tr *monitoringv1.ThanosRuler) (*monitoringv1.ThanosRulerStatus, []v1.Pod, error) {
res := &monitoringv1.ThanosRulerStatus{Paused: tr.Spec.Paused}
pods, err := kclient.CoreV1().Pods(tr.Namespace).List(context.TODO(), ListOptions(tr.Name))
pods, err := kclient.CoreV1().Pods(tr.Namespace).List(ctx, ListOptions(tr.Name))
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving pods of failed")
}
sset, err := kclient.AppsV1().StatefulSets(tr.Namespace).Get(context.TODO(), statefulSetNameFromThanosName(tr.Name), metav1.GetOptions{})
sset, err := kclient.AppsV1().StatefulSets(tr.Namespace).Get(ctx, statefulSetNameFromThanosName(tr.Name), metav1.GetOptions{})
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving stateful set failed")
}

View file

@ -42,7 +42,7 @@ const labelThanosRulerName = "thanos-ruler-name"
// large buffer.
var maxConfigMapDataSize = int(float64(v1.MaxSecretSize) * 0.5)
func (o *Operator) createOrUpdateRuleConfigMaps(t *monitoringv1.ThanosRuler) ([]string, error) {
func (o *Operator) createOrUpdateRuleConfigMaps(ctx context.Context, t *monitoringv1.ThanosRuler) ([]string, error) {
cClient := o.kclient.CoreV1().ConfigMaps(t.Namespace)
namespaces, err := o.selectRuleNamespaces(t)
@ -55,7 +55,7 @@ func (o *Operator) createOrUpdateRuleConfigMaps(t *monitoringv1.ThanosRuler) ([]
return nil, err
}
currentConfigMapList, err := cClient.List(context.TODO(), prometheusRulesConfigMapSelector(t.Name))
currentConfigMapList, err := cClient.List(ctx, prometheusRulesConfigMapSelector(t.Name))
if err != nil {
return nil, err
}
@ -99,7 +99,7 @@ func (o *Operator) createOrUpdateRuleConfigMaps(t *monitoringv1.ThanosRuler) ([]
"thanos", t.Name,
)
for _, cm := range newConfigMaps {
_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
_, err = cClient.Create(ctx, &cm, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to create ConfigMap '%v'", cm.Name)
}
@ -110,7 +110,7 @@ func (o *Operator) createOrUpdateRuleConfigMaps(t *monitoringv1.ThanosRuler) ([]
// Simply deleting old ConfigMaps and creating new ones for now. Could be
// replaced by logic that only deletes obsolete ConfigMaps in the future.
for _, cm := range currentConfigMaps {
err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{})
err := cClient.Delete(ctx, cm.Name, metav1.DeleteOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to delete current ConfigMap '%v'", cm.Name)
}
@ -122,7 +122,7 @@ func (o *Operator) createOrUpdateRuleConfigMaps(t *monitoringv1.ThanosRuler) ([]
"thanos", t.Name,
)
for _, cm := range newConfigMaps {
_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
_, err = cClient.Create(ctx, &cm, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to create new ConfigMap '%v'", cm.Name)
}

View file

@ -30,10 +30,10 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/pkg/errors"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
"github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
"github.com/pkg/errors"
)
const (
@ -266,7 +266,7 @@ func (f *Framework) WaitForPrometheusReady(p *monitoringv1.Prometheus, timeout t
var pollErr error
err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
st, _, pollErr := prometheus.PrometheusStatus(f.KubeClient, p)
st, _, pollErr := prometheus.PrometheusStatus(context.Background(), f.KubeClient, p)
if pollErr != nil {
return false, nil

View file

@ -22,9 +22,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/pkg/errors"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus-operator/prometheus-operator/pkg/thanos"
"github.com/pkg/errors"
)
func (f *Framework) MakeBasicThanosRuler(name string, replicas int32) *monitoringv1.ThanosRuler {
@ -69,7 +69,7 @@ func (f *Framework) WaitForThanosRulerReady(tr *monitoringv1.ThanosRuler, timeou
var pollErr error
err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
st, _, pollErr := thanos.ThanosRulerStatus(f.KubeClient, tr)
st, _, pollErr := thanos.ThanosRulerStatus(context.Background(), f.KubeClient, tr)
if pollErr != nil {
return false, nil