mirror of
https://github.com/prometheus-operator/prometheus-operator.git
synced 2025-04-21 11:48:53 +00:00
*: add alertmanager operator
This commit is contained in:
parent
a48a2a12fc
commit
0059a53255
15 changed files with 987 additions and 12 deletions
cmd
example
pkg
alertmanager
analytics
prometheus
spec
|
@ -18,13 +18,17 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/coreos/prometheus-operator/pkg/alertmanager"
|
||||
"github.com/coreos/prometheus-operator/pkg/analytics"
|
||||
"github.com/coreos/prometheus-operator/pkg/operator"
|
||||
"github.com/coreos/prometheus-operator/pkg/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg operator.Config
|
||||
cfg prometheus.Config
|
||||
analyticsEnabled bool
|
||||
)
|
||||
|
||||
|
@ -41,17 +45,60 @@ func init() {
|
|||
flagset.Parse(os.Args[1:])
|
||||
}
|
||||
|
||||
func main() {
|
||||
func Main() int {
|
||||
if analyticsEnabled {
|
||||
analytics.Enable()
|
||||
}
|
||||
c, err := operator.New(cfg)
|
||||
|
||||
po, err := prometheus.New(cfg)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
return 1
|
||||
}
|
||||
if err := c.Run(make(chan struct{})); err != nil {
|
||||
|
||||
ao, err := alertmanager.New(cfg)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
return 1
|
||||
}
|
||||
|
||||
stopc := make(chan struct{})
|
||||
errc := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if err := po.Run(stopc); err != nil {
|
||||
errc <- err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if err := ao.Run(stopc); err != nil {
|
||||
errc <- err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
term := make(chan os.Signal)
|
||||
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
|
||||
select {
|
||||
case <-term:
|
||||
fmt.Fprint(os.Stdout, "Received SIGTERM, exiting gracefully...")
|
||||
close(stopc)
|
||||
wg.Wait()
|
||||
case <-errc:
|
||||
fmt.Fprintf(os.Stderr, "Unhandled error received. Exiting...")
|
||||
close(stopc)
|
||||
wg.Wait()
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func main() {
|
||||
os.Exit(Main())
|
||||
}
|
||||
|
|
57
cmd/prometheus-operator/main.go
Normal file
57
cmd/prometheus-operator/main.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
// Copyright 2016 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/prometheus-operator/pkg/analytics"
|
||||
"github.com/coreos/prometheus-operator/pkg/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg prometheus.Config
|
||||
analyticsEnabled bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
flagset := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||
|
||||
flagset.StringVar(&cfg.Host, "apiserver", "", "API Server addr, e.g. ' - NOT RECOMMENDED FOR PRODUCTION - http://127.0.0.1:8080'. Omit parameter to run in on-cluster mode and utilize the service account token.")
|
||||
flagset.StringVar(&cfg.TLSConfig.CertFile, "cert-file", "", " - NOT RECOMMENDED FOR PRODUCTION - Path to public TLS certificate file.")
|
||||
flagset.StringVar(&cfg.TLSConfig.KeyFile, "key-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to private TLS certificate file.")
|
||||
flagset.StringVar(&cfg.TLSConfig.CAFile, "ca-file", "", "- NOT RECOMMENDED FOR PRODUCTION - Path to TLS CA file.")
|
||||
flagset.BoolVar(&cfg.TLSInsecure, "tls-insecure", false, "- NOT RECOMMENDED FOR PRODUCTION - Don't verify API server's CA certificate.")
|
||||
flagset.BoolVar(&analyticsEnabled, "analytics", true, "Send analytical event (Cluster Created/Deleted etc.) to Google Analytics")
|
||||
|
||||
flagset.Parse(os.Args[1:])
|
||||
}
|
||||
|
||||
func main() {
|
||||
if analyticsEnabled {
|
||||
analytics.Enable()
|
||||
}
|
||||
c, err := prometheus.New(cfg)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := c.Run(make(chan struct{})); err != nil {
|
||||
fmt.Fprint(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
18
example/alertmanager-config.yaml
Normal file
18
example/alertmanager-config.yaml
Normal file
|
@ -0,0 +1,18 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: alertmanager-main
|
||||
data:
|
||||
alertmanager.yaml: |-
|
||||
global:
|
||||
resolve_timeout: 5m
|
||||
route:
|
||||
group_by: ['job']
|
||||
group_wait: 30s
|
||||
group_interval: 5m
|
||||
repeat_interval: 12h
|
||||
receiver: 'webhook'
|
||||
receivers:
|
||||
- name: 'webhook'
|
||||
webhook_configs:
|
||||
- url: 'http://alertmanagerwh:30500/'
|
14
example/alertmanager-service.yaml
Normal file
14
example/alertmanager-service.yaml
Normal file
|
@ -0,0 +1,14 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: alertmanager-main
|
||||
spec:
|
||||
type: NodePort
|
||||
ports:
|
||||
- name: web
|
||||
nodePort: 30903
|
||||
port: 9093
|
||||
protocol: TCP
|
||||
targetPort: web
|
||||
selector:
|
||||
alertmanager: alertmanager-main
|
9
example/alertmanager.yaml
Normal file
9
example/alertmanager.yaml
Normal file
|
@ -0,0 +1,9 @@
|
|||
apiVersion: "monitoring.coreos.com/v1alpha1"
|
||||
kind: "Alertmanager"
|
||||
metadata:
|
||||
name: "alertmanager-main"
|
||||
labels:
|
||||
alertmanager: "main"
|
||||
spec:
|
||||
replicas: 3
|
||||
version: v0.5.0
|
|
@ -17,4 +17,4 @@ spec:
|
|||
alerting:
|
||||
alertmanagers:
|
||||
- namespace: monitoring
|
||||
name: alertmanager-cluster
|
||||
name: alertmanager-main
|
||||
|
|
98
pkg/alertmanager/client.go
Normal file
98
pkg/alertmanager/client.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
// Copyright 2016 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/prometheus-operator/pkg/spec"
|
||||
"k8s.io/client-go/1.5/pkg/api"
|
||||
"k8s.io/client-go/1.5/pkg/api/unversioned"
|
||||
"k8s.io/client-go/1.5/pkg/runtime"
|
||||
"k8s.io/client-go/1.5/pkg/runtime/serializer"
|
||||
"k8s.io/client-go/1.5/pkg/watch"
|
||||
"k8s.io/client-go/1.5/rest"
|
||||
"k8s.io/client-go/1.5/tools/cache"
|
||||
)
|
||||
|
||||
const resyncPeriod = 5 * time.Minute
|
||||
|
||||
func newAlertmanagerRESTClient(c rest.Config) (*rest.RESTClient, error) {
|
||||
c.APIPath = "/apis"
|
||||
c.GroupVersion = &unversioned.GroupVersion{
|
||||
Group: "monitoring.coreos.com",
|
||||
Version: "v1alpha1",
|
||||
}
|
||||
// TODO(fabxc): is this even used with our custom list/watch functions?
|
||||
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
|
||||
return rest.RESTClientFor(&c)
|
||||
}
|
||||
|
||||
type alertmanagerDecoder struct {
|
||||
dec *json.Decoder
|
||||
close func() error
|
||||
}
|
||||
|
||||
func (d *alertmanagerDecoder) Close() {
|
||||
d.close()
|
||||
}
|
||||
|
||||
func (d *alertmanagerDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
|
||||
var e struct {
|
||||
Type watch.EventType
|
||||
Object spec.Alertmanager
|
||||
}
|
||||
if err := d.dec.Decode(&e); err != nil {
|
||||
return watch.Error, nil, err
|
||||
}
|
||||
return e.Type, &e.Object, nil
|
||||
}
|
||||
|
||||
// NewAlertmanagerListWatch returns a new ListWatch on the Alertmanager resource.
|
||||
func NewAlertmanagerListWatch(client *rest.RESTClient) *cache.ListWatch {
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
req := client.Get().
|
||||
Namespace(api.NamespaceAll).
|
||||
Resource("alertmanagers").
|
||||
// VersionedParams(&options, api.ParameterCodec)
|
||||
FieldsSelectorParam(nil)
|
||||
|
||||
b, err := req.DoRaw()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var p spec.AlertmanagerList
|
||||
return &p, json.Unmarshal(b, &p)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
r, err := client.Get().
|
||||
Prefix("watch").
|
||||
Namespace(api.NamespaceAll).
|
||||
Resource("alertmanagers").
|
||||
// VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(nil).
|
||||
Stream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return watch.NewStreamWatcher(&alertmanagerDecoder{
|
||||
dec: json.NewDecoder(r),
|
||||
close: r.Close,
|
||||
}), nil
|
||||
},
|
||||
}
|
||||
}
|
489
pkg/alertmanager/operator.go
Normal file
489
pkg/alertmanager/operator.go
Normal file
|
@ -0,0 +1,489 @@
|
|||
// Copyright 2016 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/prometheus-operator/pkg/analytics"
|
||||
"github.com/coreos/prometheus-operator/pkg/prometheus"
|
||||
"github.com/coreos/prometheus-operator/pkg/spec"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"k8s.io/client-go/1.5/kubernetes"
|
||||
"k8s.io/client-go/1.5/pkg/api"
|
||||
apierrors "k8s.io/client-go/1.5/pkg/api/errors"
|
||||
"k8s.io/client-go/1.5/pkg/api/v1"
|
||||
"k8s.io/client-go/1.5/pkg/apis/apps/v1alpha1"
|
||||
extensionsobj "k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/client-go/1.5/pkg/labels"
|
||||
utilruntime "k8s.io/client-go/1.5/pkg/util/runtime"
|
||||
"k8s.io/client-go/1.5/pkg/util/wait"
|
||||
"k8s.io/client-go/1.5/rest"
|
||||
"k8s.io/client-go/1.5/tools/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
tprAlertmanager = "alertmanager.monitoring.coreos.com"
|
||||
)
|
||||
|
||||
// Operator manages lify cycle of Alertmanager deployments and
|
||||
// monitoring configurations.
|
||||
type Operator struct {
|
||||
kclient *kubernetes.Clientset
|
||||
pclient *rest.RESTClient
|
||||
logger log.Logger
|
||||
|
||||
alrtInf cache.SharedIndexInformer
|
||||
psetInf cache.SharedIndexInformer
|
||||
|
||||
queue *queue
|
||||
|
||||
host string
|
||||
}
|
||||
|
||||
// New creates a new controller.
|
||||
func New(c prometheus.Config) (*Operator, error) {
|
||||
cfg, err := newClusterConfig(c.Host, c.TLSInsecure, &c.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := kubernetes.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger := log.NewContext(log.NewLogfmtLogger(os.Stdout)).
|
||||
With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||
|
||||
promclient, err := newAlertmanagerRESTClient(*cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Operator{
|
||||
kclient: client,
|
||||
pclient: promclient,
|
||||
logger: logger,
|
||||
queue: newQueue(200),
|
||||
host: cfg.Host,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run the controller.
|
||||
func (c *Operator) Run(stopc <-chan struct{}) error {
|
||||
defer c.queue.close()
|
||||
go c.worker()
|
||||
|
||||
v, err := c.kclient.Discovery().ServerVersion()
|
||||
if err != nil {
|
||||
return fmt.Errorf("communicating with server failed: %s", err)
|
||||
}
|
||||
c.logger.Log("msg", "connection established", "cluster-version", v)
|
||||
|
||||
if err := c.createTPRs(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.alrtInf = cache.NewSharedIndexInformer(
|
||||
NewAlertmanagerListWatch(c.pclient),
|
||||
&spec.Alertmanager{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
c.psetInf = cache.NewSharedIndexInformer(
|
||||
cache.NewListWatchFromClient(c.kclient.Apps().GetRESTClient(), "petsets", api.NamespaceAll, nil),
|
||||
&v1alpha1.PetSet{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
|
||||
c.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(p interface{}) {
|
||||
c.logger.Log("msg", "enqueueAlertmanager", "trigger", "alertmanager add")
|
||||
analytics.AlertmanagerCreated()
|
||||
c.enqueueAlertmanager(p)
|
||||
},
|
||||
DeleteFunc: func(p interface{}) {
|
||||
c.logger.Log("msg", "enqueueAlertmanager", "trigger", "alertmanager del")
|
||||
analytics.AlertmanagerDeleted()
|
||||
c.enqueueAlertmanager(p)
|
||||
},
|
||||
UpdateFunc: func(_, p interface{}) {
|
||||
c.logger.Log("msg", "enqueueAlertmanager", "trigger", "alertmanager update")
|
||||
c.enqueueAlertmanager(p)
|
||||
},
|
||||
})
|
||||
c.psetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(d interface{}) {
|
||||
c.logger.Log("msg", "addPetSet", "trigger", "petset add")
|
||||
c.addPetSet(d)
|
||||
},
|
||||
DeleteFunc: func(d interface{}) {
|
||||
c.logger.Log("msg", "deletePetSet", "trigger", "petset delete")
|
||||
c.deletePetSet(d)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
c.logger.Log("msg", "updatePetSet", "trigger", "petset update")
|
||||
c.updatePetSet(old, cur)
|
||||
},
|
||||
})
|
||||
|
||||
go c.alrtInf.Run(stopc)
|
||||
go c.psetInf.Run(stopc)
|
||||
|
||||
for !c.alrtInf.HasSynced() || !c.psetInf.HasSynced() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
<-stopc
|
||||
return nil
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
ch chan *spec.Alertmanager
|
||||
}
|
||||
|
||||
func newQueue(size int) *queue {
|
||||
return &queue{ch: make(chan *spec.Alertmanager, size)}
|
||||
}
|
||||
|
||||
func (q *queue) add(p *spec.Alertmanager) { q.ch <- p }
|
||||
func (q *queue) close() { close(q.ch) }
|
||||
|
||||
func (q *queue) pop() (*spec.Alertmanager, bool) {
|
||||
p, ok := <-q.ch
|
||||
return p, ok
|
||||
}
|
||||
|
||||
var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
|
||||
|
||||
func (c *Operator) enqueueAlertmanager(p interface{}) {
|
||||
c.queue.add(p.(*spec.Alertmanager))
|
||||
}
|
||||
|
||||
func (c *Operator) enqueueAll() {
|
||||
cache.ListAll(c.alrtInf.GetStore(), labels.Everything(), func(o interface{}) {
|
||||
c.enqueueAlertmanager(o.(*spec.Alertmanager))
|
||||
})
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (c *Operator) worker() {
|
||||
for {
|
||||
p, ok := c.queue.pop()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := c.reconcile(p); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("reconciliation failed: %s", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) alertmanagerForPetSet(p *v1alpha1.PetSet) *spec.Alertmanager {
|
||||
key, err := keyFunc(p)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("creating key: %s", err))
|
||||
return nil
|
||||
}
|
||||
// Namespace/Name are one-to-one so the key will find the respective Alertmanager resource.
|
||||
a, exists, err := c.alrtInf.GetStore().GetByKey(key)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("get Alertmanager resource: %s", err))
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
return a.(*spec.Alertmanager)
|
||||
}
|
||||
|
||||
func (c *Operator) deletePetSet(o interface{}) {
|
||||
p := o.(*v1alpha1.PetSet)
|
||||
// Wake up Alertmanager resource the deployment belongs to.
|
||||
if a := c.alertmanagerForPetSet(p); a != nil {
|
||||
c.enqueueAlertmanager(a)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) addPetSet(o interface{}) {
|
||||
p := o.(*v1alpha1.PetSet)
|
||||
// Wake up Alertmanager resource the deployment belongs to.
|
||||
if a := c.alertmanagerForPetSet(p); a != nil {
|
||||
c.enqueueAlertmanager(a)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) updatePetSet(oldo, curo interface{}) {
|
||||
old := oldo.(*v1alpha1.PetSet)
|
||||
cur := curo.(*v1alpha1.PetSet)
|
||||
|
||||
c.logger.Log("msg", "update handler", "old", old.ResourceVersion, "cur", cur.ResourceVersion)
|
||||
|
||||
// Periodic resync may resend the deployment without changes in-between.
|
||||
// Also breaks loops created by updating the resource ourselves.
|
||||
if old.ResourceVersion == cur.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
// Wake up Alertmanager resource the deployment belongs to.
|
||||
if a := c.alertmanagerForPetSet(cur); a != nil {
|
||||
c.enqueueAlertmanager(a)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) reconcile(p *spec.Alertmanager) error {
|
||||
key, err := keyFunc(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.logger.Log("msg", "reconcile alertmanager", "key", key)
|
||||
|
||||
_, exists, err := c.alrtInf.GetStore().GetByKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
// TODO(fabxc): we want to do server side deletion due to the variety of
|
||||
// resources we create.
|
||||
// Doing so just based on the deletion event is not reliable, so
|
||||
// we have to garbage collect the controller-created resources in some other way.
|
||||
//
|
||||
// Let's rely on the index key matching that of the created configmap and replica
|
||||
// set for now. This does not work if we delete Alertmanager resources as the
|
||||
// controller is not running – that could be solved via garbage collection later.
|
||||
return c.deleteAlertmanager(p)
|
||||
}
|
||||
|
||||
// Create governing service if it doesn't exist.
|
||||
svcClient := c.kclient.Core().Services(p.Namespace)
|
||||
if _, err := svcClient.Create(makePetSetService(p)); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("create petset service: %s", err)
|
||||
}
|
||||
|
||||
psetClient := c.kclient.Apps().PetSets(p.Namespace)
|
||||
psetQ := &v1alpha1.PetSet{}
|
||||
psetQ.Namespace = p.Namespace
|
||||
psetQ.Name = p.Name
|
||||
obj, exists, err := c.psetInf.GetStore().Get(psetQ)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
if _, err := psetClient.Create(makePetSet(p.Namespace, p, nil)); err != nil {
|
||||
return fmt.Errorf("create petset: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if _, err := psetClient.Update(makePetSet(p.Namespace, p, obj.(*v1alpha1.PetSet))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.syncVersion(p)
|
||||
}
|
||||
|
||||
func podRunningAndReady(pod v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
return false, fmt.Errorf("pod completed")
|
||||
case v1.PodRunning:
|
||||
for _, cond := range pod.Status.Conditions {
|
||||
if cond.Type != v1.PodReady {
|
||||
continue
|
||||
}
|
||||
return cond.Status == v1.ConditionTrue, nil
|
||||
}
|
||||
return false, fmt.Errorf("pod ready conditation not found")
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// syncVersion ensures that all running pods for a Alertmanager have the required version.
|
||||
// It kills pods with the wrong version one-after-one and lets the PetSet controller
|
||||
// create new pods.
|
||||
//
|
||||
// TODO(fabxc): remove this once the PetSet controller learns how to do rolling updates.
|
||||
func (c *Operator) syncVersion(p *spec.Alertmanager) error {
|
||||
selector, err := labels.Parse("app=alertmanager,alertmanager=" + p.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
podClient := c.kclient.Core().Pods(p.Namespace)
|
||||
|
||||
Outer:
|
||||
for {
|
||||
pods, err := podClient.List(api.ListOptions{LabelSelector: selector})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, cp := range pods.Items {
|
||||
ready, err := podRunningAndReady(cp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ready {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
var pod *v1.Pod
|
||||
for _, cp := range pods.Items {
|
||||
if !strings.HasSuffix(cp.Spec.Containers[0].Image, p.Spec.Version) {
|
||||
pod = &cp
|
||||
break
|
||||
}
|
||||
}
|
||||
if pod == nil {
|
||||
return nil
|
||||
}
|
||||
if err := podClient.Delete(pod.Name, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Operator) deleteAlertmanager(p *spec.Alertmanager) error {
|
||||
// Update the replica count to 0 and wait for all pods to be deleted.
|
||||
psetClient := c.kclient.Apps().PetSets(p.Namespace)
|
||||
|
||||
key, err := keyFunc(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldPsetO, _, err := c.psetInf.GetStore().GetByKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldPset := oldPsetO.(*v1alpha1.PetSet)
|
||||
zero := int32(0)
|
||||
oldPset.Spec.Replicas = &zero
|
||||
|
||||
if _, err := psetClient.Update(oldPset); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// XXX: Selecting by ObjectMeta.Name gives an error. So use the label for now.
|
||||
selector, err := labels.Parse("app=alertmanager,alertmanager=" + p.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
podClient := c.kclient.Core().Pods(p.Namespace)
|
||||
|
||||
// TODO(fabxc): temporary solution until PetSet status provides necessary info to know
|
||||
// whether scale-down completed.
|
||||
for {
|
||||
pods, err := podClient.List(api.ListOptions{LabelSelector: selector})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Deployment scaled down, we can delete it.
|
||||
if err := psetClient.Delete(p.Name, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if err := c.kclient.Core().Services(p.Namespace).Delete(fmt.Sprintf("%s-petset", p.Name), nil); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// Delete the auto-generate configuration.
|
||||
// TODO(fabxc): add an ownerRef at creation so we don't delete config maps
|
||||
// manually created for Alertmanager servers with no ServiceMonitor selectors.
|
||||
cm := c.kclient.Core().ConfigMaps(p.Namespace)
|
||||
if err := cm.Delete(p.Name, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cm.Delete(fmt.Sprintf("%s-rules", p.Name), nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Operator) createTPRs() error {
|
||||
tprs := []*extensionsobj.ThirdPartyResource{
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: tprAlertmanager,
|
||||
},
|
||||
Versions: []extensionsobj.APIVersion{
|
||||
{Name: "v1alpha1"},
|
||||
},
|
||||
Description: "Managed Alertmanager cluster",
|
||||
},
|
||||
}
|
||||
tprClient := c.kclient.Extensions().ThirdPartyResources()
|
||||
|
||||
for _, tpr := range tprs {
|
||||
if _, err := tprClient.Create(tpr); err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
c.logger.Log("msg", "TPR created", "tpr", tpr.Name)
|
||||
}
|
||||
|
||||
// We have to wait for the TPRs to be ready. Otherwise the initial watch may fail.
|
||||
return wait.Poll(3*time.Second, 30*time.Second, func() (bool, error) {
|
||||
resp, err := c.kclient.CoreClient.Client.Get(c.host + "/apis/monitoring.coreos.com/v1alpha1/alertmanagers")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
case http.StatusNotFound: // not set up yet. wait.
|
||||
return false, nil
|
||||
default:
|
||||
return false, fmt.Errorf("invalid status code: %v", resp.Status)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func newClusterConfig(host string, tlsInsecure bool, tlsConfig *rest.TLSClientConfig) (*rest.Config, error) {
|
||||
var cfg *rest.Config
|
||||
var err error
|
||||
|
||||
if len(host) == 0 {
|
||||
if cfg, err = rest.InClusterConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
cfg = &rest.Config{
|
||||
Host: host,
|
||||
}
|
||||
hostURL, err := url.Parse(host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing host url %s : %v", host, err)
|
||||
}
|
||||
if hostURL.Scheme == "https" {
|
||||
cfg.TLSClientConfig = *tlsConfig
|
||||
cfg.Insecure = tlsInsecure
|
||||
}
|
||||
}
|
||||
cfg.QPS = 100
|
||||
cfg.Burst = 100
|
||||
|
||||
return cfg, nil
|
||||
}
|
206
pkg/alertmanager/petset.go
Normal file
206
pkg/alertmanager/petset.go
Normal file
|
@ -0,0 +1,206 @@
|
|||
// Copyright 2016 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/coreos/prometheus-operator/pkg/spec"
|
||||
"k8s.io/client-go/1.5/pkg/api/resource"
|
||||
"k8s.io/client-go/1.5/pkg/api/v1"
|
||||
"k8s.io/client-go/1.5/pkg/apis/apps/v1alpha1"
|
||||
"k8s.io/client-go/1.5/pkg/util/intstr"
|
||||
)
|
||||
|
||||
func makePetSet(namespace string, p *spec.Alertmanager, old *v1alpha1.PetSet) *v1alpha1.PetSet {
|
||||
// TODO(fabxc): is this the right point to inject defaults?
|
||||
// Ideally we would do it before storing but that's currently not possible.
|
||||
// Potentially an update handler on first insertion.
|
||||
|
||||
baseImage := p.Spec.BaseImage
|
||||
if baseImage == "" {
|
||||
baseImage = "quay.io/prometheus/alertmanager"
|
||||
}
|
||||
version := p.Spec.Version
|
||||
if version == "" {
|
||||
version = "v0.5.0"
|
||||
}
|
||||
replicas := p.Spec.Replicas
|
||||
if replicas < 1 {
|
||||
replicas = 1
|
||||
}
|
||||
image := fmt.Sprintf("%s:%s", baseImage, version)
|
||||
|
||||
petset := &v1alpha1.PetSet{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: p.Name,
|
||||
},
|
||||
Spec: makePetSetSpec(namespace, p.Name, image, version, replicas),
|
||||
}
|
||||
if vc := p.Spec.Storage; vc == nil {
|
||||
petset.Spec.Template.Spec.Volumes = append(petset.Spec.Template.Spec.Volumes, v1.Volume{
|
||||
Name: fmt.Sprintf("%s-db", p.Name),
|
||||
VolumeSource: v1.VolumeSource{
|
||||
EmptyDir: &v1.EmptyDirVolumeSource{},
|
||||
},
|
||||
})
|
||||
} else {
|
||||
pvc := v1.PersistentVolumeClaim{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-db", p.Name),
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
|
||||
Resources: vc.Resources,
|
||||
Selector: vc.Selector,
|
||||
},
|
||||
}
|
||||
if len(vc.Class) > 0 {
|
||||
pvc.ObjectMeta.Annotations = map[string]string{
|
||||
"volume.beta.kubernetes.io/storage-class": vc.Class,
|
||||
}
|
||||
}
|
||||
petset.Spec.VolumeClaimTemplates = append(petset.Spec.VolumeClaimTemplates, pvc)
|
||||
}
|
||||
|
||||
if old != nil {
|
||||
petset.Annotations = old.Annotations
|
||||
}
|
||||
return petset
|
||||
}
|
||||
|
||||
func makePetSetService(p *spec.Alertmanager) *v1.Service {
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: "alertmanager",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: "None",
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "web",
|
||||
Port: 9093,
|
||||
TargetPort: intstr.FromInt(9093),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "mesh",
|
||||
Port: 6783,
|
||||
TargetPort: intstr.FromInt(6783),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Selector: map[string]string{
|
||||
"app": "alertmanager",
|
||||
},
|
||||
},
|
||||
}
|
||||
return svc
|
||||
}
|
||||
|
||||
func makePetSetSpec(ns, name, image, version string, replicas int32) v1alpha1.PetSetSpec {
|
||||
commands := []string{
|
||||
"/bin/alertmanager",
|
||||
fmt.Sprintf("-config.file=%s", "/etc/prometheus/config/alertmanager.yaml"),
|
||||
fmt.Sprintf("-web.listen-address=:%d", 9093),
|
||||
fmt.Sprintf("-mesh.listen-address=:%d", 6783),
|
||||
fmt.Sprintf("-storage.path=%s", "/etc/prometheus/data"),
|
||||
}
|
||||
for i := int32(0); i < replicas; i++ {
|
||||
commands = append(commands, fmt.Sprintf("-mesh.peer=%s-%d.%s.%s.svc", name, i, "alertmanager", ns))
|
||||
}
|
||||
|
||||
terminationGracePeriod := int64(0)
|
||||
return v1alpha1.PetSetSpec{
|
||||
ServiceName: "alertmanager",
|
||||
Replicas: &replicas,
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"app": "alertmanager",
|
||||
"alertmanager": name,
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"pod.alpha.kubernetes.io/initialized": "true",
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
TerminationGracePeriodSeconds: &terminationGracePeriod,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Command: commands,
|
||||
Name: name,
|
||||
Image: image,
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "web",
|
||||
ContainerPort: 9093,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "mesh",
|
||||
ContainerPort: 6783,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "config-volume",
|
||||
MountPath: "/etc/prometheus/config",
|
||||
},
|
||||
{
|
||||
Name: fmt.Sprintf("%s-db", name),
|
||||
MountPath: "/var/prometheus/data",
|
||||
SubPath: "alertmanager-db",
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Name: "config-reloader",
|
||||
Image: "jimmidyson/configmap-reload",
|
||||
Args: []string{
|
||||
"-webhook-url=http://localhost:9093/-/reload",
|
||||
"-volume-dir=/etc/prometheus/config",
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "config-volume",
|
||||
ReadOnly: true,
|
||||
MountPath: "/etc/prometheus/config",
|
||||
},
|
||||
},
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("5m"),
|
||||
v1.ResourceMemory: resource.MustParse("10Mi"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "config-volume",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
ConfigMap: &v1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -69,3 +69,11 @@ func PrometheusCreated() {
|
|||
func PrometheusDeleted() {
|
||||
send(ga.NewEvent(category, "prometheus_deleted"))
|
||||
}
|
||||
|
||||
func AlertmanagerCreated() {
|
||||
send(ga.NewEvent(category, "alertmanager_created"))
|
||||
}
|
||||
|
||||
func AlertmanagerDeleted() {
|
||||
send(ga.NewEvent(category, "alertmanager_deleted"))
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package operator
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package operator
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package operator
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -12,7 +12,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package operator
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"html/template"
|
|
@ -108,3 +108,32 @@ type ServiceMonitorList struct {
|
|||
|
||||
Items []*ServiceMonitor `json:"items"`
|
||||
}
|
||||
|
||||
type Alertmanager struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
v1.ObjectMeta `json:"metadata,omitempty"`
|
||||
Spec AlertmanagerSpec `json:"spec"`
|
||||
}
|
||||
|
||||
type AlertmanagerSpec struct {
|
||||
// Version the cluster should be on.
|
||||
Version string `json:"version"`
|
||||
// Base image that is used to deploy pods.
|
||||
BaseImage string `json:"baseImage"`
|
||||
// Size is the expected size of the alertmanager cluster. The controller will
|
||||
// eventually make the size of the running cluster equal to the expected
|
||||
// size.
|
||||
Replicas int32 `json:"replicas"`
|
||||
// Storage is the definition of how storage will be used by the Alertmanager
|
||||
// instances.
|
||||
Storage *StorageSpec `json:"storage"`
|
||||
}
|
||||
|
||||
type AlertmanagerList struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
// Standard list metadata
|
||||
// More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata
|
||||
unversioned.ListMeta `json:"metadata,omitempty"`
|
||||
// Items is a list of third party objects
|
||||
Items []Alertmanager `json:"items"`
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue