1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-15 16:56:24 +00:00
prometheus-operator/pkg/alertmanager/operator.go

585 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alertmanager
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/coreos/prometheus-operator/pkg/client/monitoring"
monitoringv1 "github.com/coreos/prometheus-operator/pkg/client/monitoring/v1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
prometheusoperator "github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
extensionsobj "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
resyncPeriod = 5 * time.Minute
)
// Operator manages lify cycle of Alertmanager deployments and
// monitoring configurations.
type Operator struct {
kclient kubernetes.Interface
mclient monitoring.Interface
crdclient apiextensionsclient.Interface
logger log.Logger
alrtInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
statefulsetErrors prometheus.Counter
config Config
}
type Config struct {
Host string
LocalHost string
ConfigReloaderImage string
AlertmanagerDefaultBaseImage string
Namespace string
Labels prometheusoperator.Labels
CrdKinds monitoringv1.CrdKinds
CrdGroup string
EnableValidation bool
DisableAutoUserGroup bool
ManageCRDs bool
}
// New creates a new controller.
func New(c prometheusoperator.Config, logger log.Logger) (*Operator, error) {
cfg, err := k8sutil.NewClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig)
if err != nil {
return nil, errors.Wrap(err, "instantiating cluster config failed")
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, errors.Wrap(err, "instantiating kubernetes client failed")
}
mclient, err := monitoring.NewForConfig(&c.CrdKinds, c.CrdGroup, cfg)
if err != nil {
return nil, errors.Wrap(err, "instantiating monitoring client failed")
}
crdclient, err := apiextensionsclient.NewForConfig(cfg)
if err != nil {
return nil, errors.Wrap(err, "instantiating apiextensions client failed")
}
o := &Operator{
kclient: client,
mclient: mclient,
crdclient: crdclient,
logger: logger,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "alertmanager"),
config: Config{
Host: c.Host,
LocalHost: c.LocalHost,
ConfigReloaderImage: c.ConfigReloaderImage,
AlertmanagerDefaultBaseImage: c.AlertmanagerDefaultBaseImage,
Namespace: c.Namespace,
CrdGroup: c.CrdGroup,
CrdKinds: c.CrdKinds,
Labels: c.Labels,
EnableValidation: c.EnableValidation,
DisableAutoUserGroup: c.DisableAutoUserGroup,
ManageCRDs: c.ManageCRDs,
},
}
o.alrtInf = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: o.mclient.MonitoringV1().Alertmanagers(o.config.Namespace).List,
WatchFunc: o.mclient.MonitoringV1().Alertmanagers(o.config.Namespace).Watch,
},
&monitoringv1.Alertmanager{}, resyncPeriod, cache.Indexers{},
)
o.ssetInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(o.kclient.AppsV1beta2().RESTClient(), "statefulsets", o.config.Namespace, fields.Everything()),
&appsv1.StatefulSet{}, resyncPeriod, cache.Indexers{},
)
o.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleAlertmanagerAdd,
DeleteFunc: o.handleAlertmanagerDelete,
UpdateFunc: o.handleAlertmanagerUpdate,
})
o.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.handleStatefulSetAdd,
DeleteFunc: o.handleStatefulSetDelete,
UpdateFunc: o.handleStatefulSetUpdate,
})
return o, nil
}
func (c *Operator) RegisterMetrics(r prometheus.Registerer) {
c.statefulsetErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_operator_alertmanager_reconcile_errors_total",
Help: "Number of errors that occurred while reconciling the alertmanager statefulset",
})
r.MustRegister(
c.statefulsetErrors,
NewAlertmanagerCollector(c.alrtInf.GetStore()),
)
}
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
defer c.queue.ShutDown()
errChan := make(chan error)
go func() {
v, err := c.kclient.Discovery().ServerVersion()
if err != nil {
errChan <- errors.Wrap(err, "communicating with server failed")
return
}
level.Info(c.logger).Log("msg", "connection established", "cluster-version", v)
if c.config.ManageCRDs {
if err := c.createCRDs(); err != nil {
errChan <- err
return
}
}
errChan <- nil
}()
select {
case err := <-errChan:
if err != nil {
return err
}
level.Info(c.logger).Log("msg", "CRD API endpoints ready")
case <-stopc:
return nil
}
go c.worker()
go c.alrtInf.Run(stopc)
go c.ssetInf.Run(stopc)
<-stopc
return nil
}
func (c *Operator) keyFunc(obj interface{}) (string, bool) {
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
level.Error(c.logger).Log("msg", "creating key failed", "err", err)
return k, false
}
return k, true
}
func (c *Operator) getObject(obj interface{}) (metav1.Object, bool) {
ts, ok := obj.(cache.DeletedFinalStateUnknown)
if ok {
obj = ts.Obj
}
o, err := meta.Accessor(obj)
if err != nil {
level.Error(c.logger).Log("msg", "get object failed", "err", err)
return nil, false
}
return o, true
}
// enqueue adds a key to the queue. If obj is a key already it gets added directly.
// Otherwise, the key is extracted via keyFunc.
func (c *Operator) enqueue(obj interface{}) {
if obj == nil {
return
}
key, ok := obj.(string)
if !ok {
key, ok = c.keyFunc(obj)
if !ok {
return
}
}
c.queue.Add(key)
}
// enqueueForNamespace enqueues all Alertmanager object keys that belong to the given namespace.
func (c *Operator) enqueueForNamespace(ns string) {
cache.ListAll(c.alrtInf.GetStore(), labels.Everything(), func(obj interface{}) {
am := obj.(*monitoringv1.Alertmanager)
if am.Namespace == ns {
c.enqueue(am)
}
})
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Operator) worker() {
for c.processNextWorkItem() {
}
}
func (c *Operator) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.sync(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
c.statefulsetErrors.Inc()
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key)))
c.queue.AddRateLimited(key)
return true
}
func (c *Operator) alertmanagerForStatefulSet(sset interface{}) *monitoringv1.Alertmanager {
key, ok := c.keyFunc(sset)
if !ok {
return nil
}
aKey := statefulSetKeyToAlertmanagerKey(key)
a, exists, err := c.alrtInf.GetStore().GetByKey(aKey)
if err != nil {
level.Error(c.logger).Log("msg", "Alertmanager lookup failed", "err", err)
return nil
}
if !exists {
return nil
}
return a.(*monitoringv1.Alertmanager)
}
func alertmanagerNameFromStatefulSetName(name string) string {
return strings.TrimPrefix(name, "alertmanager-")
}
func statefulSetNameFromAlertmanagerName(name string) string {
return "alertmanager-" + name
}
func statefulSetKeyToAlertmanagerKey(key string) string {
keyParts := strings.Split(key, "/")
return keyParts[0] + "/" + strings.TrimPrefix(keyParts[1], "alertmanager-")
}
func alertmanagerKeyToStatefulSetKey(key string) string {
keyParts := strings.Split(key, "/")
return keyParts[0] + "/alertmanager-" + keyParts[1]
}
func (c *Operator) handleAlertmanagerAdd(obj interface{}) {
key, ok := c.keyFunc(obj)
if !ok {
return
}
level.Debug(c.logger).Log("msg", "Alertmanager added", "key", key)
c.enqueue(key)
}
func (c *Operator) handleAlertmanagerDelete(obj interface{}) {
key, ok := c.keyFunc(obj)
if !ok {
return
}
level.Debug(c.logger).Log("msg", "Alertmanager deleted", "key", key)
c.enqueue(key)
}
func (c *Operator) handleAlertmanagerUpdate(old, cur interface{}) {
key, ok := c.keyFunc(cur)
if !ok {
return
}
level.Debug(c.logger).Log("msg", "Alertmanager updated", "key", key)
c.enqueue(key)
}
func (c *Operator) handleStatefulSetDelete(obj interface{}) {
if a := c.alertmanagerForStatefulSet(obj); a != nil {
c.enqueue(a)
}
}
func (c *Operator) handleStatefulSetAdd(obj interface{}) {
if a := c.alertmanagerForStatefulSet(obj); a != nil {
c.enqueue(a)
}
}
func (c *Operator) handleStatefulSetUpdate(oldo, curo interface{}) {
old := oldo.(*appsv1.StatefulSet)
cur := curo.(*appsv1.StatefulSet)
level.Debug(c.logger).Log("msg", "update handler", "old", old.ResourceVersion, "cur", cur.ResourceVersion)
// Periodic resync may resend the deployment without changes in-between.
// Also breaks loops created by updating the resource ourselves.
if old.ResourceVersion == cur.ResourceVersion {
return
}
// Wake up Alertmanager resource the deployment belongs to.
if a := c.alertmanagerForStatefulSet(cur); a != nil {
c.enqueue(a)
}
}
func (c *Operator) sync(key string) error {
obj, exists, err := c.alrtInf.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
// TODO(fabxc): we want to do server side deletion due to the variety of
// resources we create.
// Doing so just based on the deletion event is not reliable, so
// we have to garbage collect the controller-created resources in some other way.
//
// Let's rely on the index key matching that of the created configmap and replica
// set for now. This does not work if we delete Alertmanager resources as the
// controller is not running that could be solved via garbage collection later.
return c.destroyAlertmanager(key)
}
am := obj.(*monitoringv1.Alertmanager)
if am.Spec.Paused {
return nil
}
level.Info(c.logger).Log("msg", "sync alertmanager", "key", key)
// Create governing service if it doesn't exist.
svcClient := c.kclient.Core().Services(am.Namespace)
if err = k8sutil.CreateOrUpdateService(svcClient, makeStatefulSetService(am, c.config)); err != nil {
return errors.Wrap(err, "synchronizing governing service failed")
}
ssetClient := c.kclient.AppsV1beta2().StatefulSets(am.Namespace)
// Ensure we have a StatefulSet running Alertmanager deployed.
obj, exists, err = c.ssetInf.GetIndexer().GetByKey(alertmanagerKeyToStatefulSetKey(key))
if err != nil {
return errors.Wrap(err, "retrieving statefulset failed")
}
if !exists {
sset, err := makeStatefulSet(am, nil, c.config)
if err != nil {
return errors.Wrap(err, "making the statefulset, to create, failed")
}
if _, err := ssetClient.Create(sset); err != nil {
return errors.Wrap(err, "creating statefulset failed")
}
return nil
}
sset, err := makeStatefulSet(am, obj.(*appsv1.StatefulSet), c.config)
if err != nil {
return errors.Wrap(err, "making the statefulset, to update, failed")
}
if _, err := ssetClient.Update(sset); err != nil {
return errors.Wrap(err, "updating statefulset failed")
}
return nil
}
func ListOptions(name string) metav1.ListOptions {
return metav1.ListOptions{
LabelSelector: fields.SelectorFromSet(fields.Set(map[string]string{
"app": "alertmanager",
"alertmanager": name,
})).String(),
}
}
func AlertmanagerStatus(kclient kubernetes.Interface, a *monitoringv1.Alertmanager) (*monitoringv1.AlertmanagerStatus, []v1.Pod, error) {
res := &monitoringv1.AlertmanagerStatus{Paused: a.Spec.Paused}
pods, err := kclient.Core().Pods(a.Namespace).List(ListOptions(a.Name))
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving pods of failed")
}
sset, err := kclient.AppsV1beta2().StatefulSets(a.Namespace).Get(statefulSetNameFromAlertmanagerName(a.Name), metav1.GetOptions{})
if err != nil {
return nil, nil, errors.Wrap(err, "retrieving stateful set failed")
}
res.Replicas = int32(len(pods.Items))
var oldPods []v1.Pod
for _, pod := range pods.Items {
ready, err := k8sutil.PodRunningAndReady(pod)
if err != nil {
return nil, nil, errors.Wrap(err, "cannot determine pod ready state")
}
if ready {
res.AvailableReplicas++
// TODO(fabxc): detect other fields of the pod template that are mutable.
if needsUpdate(&pod, sset.Spec.Template) {
oldPods = append(oldPods, pod)
} else {
res.UpdatedReplicas++
}
continue
}
res.UnavailableReplicas++
}
return res, oldPods, nil
}
func needsUpdate(pod *v1.Pod, tmpl v1.PodTemplateSpec) bool {
c1 := pod.Spec.Containers[0]
c2 := tmpl.Spec.Containers[0]
if c1.Image != c2.Image {
return true
}
if !reflect.DeepEqual(c1.Args, c2.Args) {
return true
}
return false
}
// TODO(brancz): Remove this function once Kubernetes 1.7 compatibility is dropped.
// Starting with Kubernetes 1.8 OwnerReferences are properly handled for CRDs.
func (c *Operator) destroyAlertmanager(key string) error {
ssetKey := alertmanagerKeyToStatefulSetKey(key)
obj, exists, err := c.ssetInf.GetStore().GetByKey(ssetKey)
if err != nil {
return errors.Wrap(err, "retrieving statefulset from cache failed")
}
if !exists {
return nil
}
sset := obj.(*appsv1.StatefulSet)
*sset.Spec.Replicas = 0
// Update the replica count to 0 and wait for all pods to be deleted.
ssetClient := c.kclient.AppsV1beta2().StatefulSets(sset.Namespace)
if _, err := ssetClient.Update(sset); err != nil {
return errors.Wrap(err, "updating statefulset for scale-down failed")
}
podClient := c.kclient.Core().Pods(sset.Namespace)
// TODO(fabxc): temporary solution until StatefulSet status provides necessary info to know
// whether scale-down completed.
for {
pods, err := podClient.List(ListOptions(alertmanagerNameFromStatefulSetName(sset.Name)))
if err != nil {
return errors.Wrap(err, "retrieving pods of statefulset failed")
}
if len(pods.Items) == 0 {
break
}
time.Sleep(50 * time.Millisecond)
}
// StatefulSet scaled down, we can delete it.
if err := ssetClient.Delete(sset.Name, nil); err != nil {
return errors.Wrap(err, "deleting statefulset failed")
}
return nil
}
func (c *Operator) createCRDs() error {
crds := []*extensionsobj.CustomResourceDefinition{
k8sutil.NewCustomResourceDefinition(c.config.CrdKinds.Alertmanager, c.config.CrdGroup, c.config.Labels.LabelsMap, c.config.EnableValidation),
}
crdClient := c.crdclient.ApiextensionsV1beta1().CustomResourceDefinitions()
for _, crd := range crds {
oldCRD, err := crdClient.Get(crd.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "getting CRD: %s", crd.Spec.Names.Kind)
}
if apierrors.IsNotFound(err) {
if _, err := crdClient.Create(crd); err != nil {
return errors.Wrapf(err, "creating CRD: %s", crd.Spec.Names.Kind)
}
level.Info(c.logger).Log("msg", "CRD created", "crd", crd.Spec.Names.Kind)
}
if err == nil {
crd.ResourceVersion = oldCRD.ResourceVersion
if _, err := crdClient.Update(crd); err != nil {
return errors.Wrapf(err, "creating CRD: %s", crd.Spec.Names.Kind)
}
level.Info(c.logger).Log("msg", "CRD updated", "crd", crd.Spec.Names.Kind)
}
}
crdListFuncs := []struct {
name string
listFunc func(opts metav1.ListOptions) (runtime.Object, error)
}{
{"Alertmanager", c.mclient.MonitoringV1().Alertmanagers(c.config.Namespace).List},
}
for _, crdListFunc := range crdListFuncs {
err := k8sutil.WaitForCRDReady(crdListFunc.listFunc)
if err != nil {
return errors.Wrapf(err, "waiting for %v crd failed", crdListFunc.name)
}
}
return nil
}