mirror of
https://github.com/prometheus-operator/prometheus-operator.git
synced 2025-04-21 11:48:53 +00:00
*: add alertmanager status and e2e test for version migration
This commit is contained in:
parent
fde52882a1
commit
522c3a779f
8 changed files with 132 additions and 90 deletions
2
Makefile
2
Makefile
|
@ -2,7 +2,7 @@ all: build
|
|||
|
||||
REPO?=quay.io/coreos/prometheus-operator
|
||||
TAG?=$(shell git rev-parse --short HEAD)
|
||||
NAMESPACE?=prometheus-operator-e2e-tests-$(shell head /dev/urandom | tr -dc a-z0-9 | head -c 13 ; echo '')
|
||||
NAMESPACE?=prometheus-operator-e2e-tests-$(shell LC_CTYPE=C tr -dc a-z0-9 < /dev/urandom | head -c 13 ; echo '')
|
||||
|
||||
CLUSTER_IP?=$(shell minikube ip)
|
||||
|
||||
|
|
|
@ -76,19 +76,43 @@ func New(c prometheus.Config, logger log.Logger) (*Operator, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &Operator{
|
||||
o := &Operator{
|
||||
kclient: client,
|
||||
mclient: mclient,
|
||||
logger: logger,
|
||||
queue: queue.New(),
|
||||
host: cfg.Host,
|
||||
}, nil
|
||||
}
|
||||
|
||||
o.alrtInf = cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: o.mclient.Alertmanagers(api.NamespaceAll).List,
|
||||
WatchFunc: o.mclient.Alertmanagers(api.NamespaceAll).Watch,
|
||||
},
|
||||
&v1alpha1.Alertmanager{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
o.ssetInf = cache.NewSharedIndexInformer(
|
||||
cache.NewListWatchFromClient(o.kclient.Apps().RESTClient(), "statefulsets", api.NamespaceAll, nil),
|
||||
&v1beta1.StatefulSet{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
|
||||
o.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: o.handleAlertmanagerAdd,
|
||||
DeleteFunc: o.handleAlertmanagerDelete,
|
||||
UpdateFunc: o.handleAlertmanagerUpdate,
|
||||
})
|
||||
o.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: o.handleStatefulSetAdd,
|
||||
DeleteFunc: o.handleStatefulSetDelete,
|
||||
UpdateFunc: o.handleStatefulSetUpdate,
|
||||
})
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// Run the controller.
|
||||
func (c *Operator) Run(stopc <-chan struct{}) error {
|
||||
defer c.queue.ShutDown()
|
||||
go c.worker()
|
||||
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
|
@ -116,28 +140,7 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
c.alrtInf = cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: c.mclient.Alertmanagers(api.NamespaceAll).List,
|
||||
WatchFunc: c.mclient.Alertmanagers(api.NamespaceAll).Watch,
|
||||
},
|
||||
&v1alpha1.Alertmanager{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
c.ssetInf = cache.NewSharedIndexInformer(
|
||||
cache.NewListWatchFromClient(c.kclient.Apps().RESTClient(), "statefulsets", api.NamespaceAll, nil),
|
||||
&v1beta1.StatefulSet{}, resyncPeriod, cache.Indexers{},
|
||||
)
|
||||
|
||||
c.alrtInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleAlertmanagerAdd,
|
||||
DeleteFunc: c.handleAlertmanagerDelete,
|
||||
UpdateFunc: c.handleAlertmanagerUpdate,
|
||||
})
|
||||
c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.handleStatefulSetAdd,
|
||||
DeleteFunc: c.handleStatefulSetDelete,
|
||||
UpdateFunc: c.handleStatefulSetUpdate,
|
||||
})
|
||||
go c.worker()
|
||||
|
||||
go c.alrtInf.Run(stopc)
|
||||
go c.ssetInf.Run(stopc)
|
||||
|
@ -319,6 +322,9 @@ func (c *Operator) sync(key string) error {
|
|||
}
|
||||
|
||||
am := obj.(*v1alpha1.Alertmanager)
|
||||
if am.Spec.Paused {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Log("msg", "sync alertmanager", "key", key)
|
||||
|
||||
|
@ -362,50 +368,29 @@ func ListOptions(name string) v1.ListOptions {
|
|||
// create new pods.
|
||||
//
|
||||
// TODO(fabxc): remove this once the StatefulSet controller learns how to do rolling updates.
|
||||
func (c *Operator) syncVersion(am *v1alpha1.Alertmanager) error {
|
||||
podClient := c.kclient.Core().Pods(am.Namespace)
|
||||
|
||||
pods, err := podClient.List(ListOptions(am.Name))
|
||||
func (c *Operator) syncVersion(a *v1alpha1.Alertmanager) error {
|
||||
status, oldPods, err := AlertmanagerStatus(c.kclient, a)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If the StatefulSet is still busy scaling, don't interfere by killing pods.
|
||||
// We enqueue ourselves again to until the StatefulSet is ready.
|
||||
if len(pods.Items) != int(am.Spec.Replicas) {
|
||||
if status.Replicas != a.Spec.Replicas {
|
||||
return fmt.Errorf("scaling in progress")
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
if status.Replicas == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var oldPods []*v1.Pod
|
||||
allReady := true
|
||||
// Only proceed if all existing pods are running and ready.
|
||||
for _, pod := range pods.Items {
|
||||
ready, err := k8sutil.PodRunningAndReady(pod)
|
||||
if err != nil {
|
||||
c.logger.Log("msg", "cannot determine pod ready state", "err", err)
|
||||
}
|
||||
if ready {
|
||||
// TODO(fabxc): detect other fields of the pod template that are mutable.
|
||||
if !strings.HasSuffix(pod.Spec.Containers[0].Image, am.Spec.Version) {
|
||||
oldPods = append(oldPods, &pod)
|
||||
}
|
||||
continue
|
||||
}
|
||||
allReady = false
|
||||
}
|
||||
|
||||
if len(oldPods) == 0 {
|
||||
return nil
|
||||
}
|
||||
if !allReady {
|
||||
if status.UnavailableReplicas > 0 {
|
||||
return fmt.Errorf("waiting for pods to become ready")
|
||||
}
|
||||
|
||||
// TODO(fabxc): delete oldest pod first.
|
||||
if err := podClient.Delete(oldPods[0].Name, nil); err != nil {
|
||||
if err := c.kclient.Core().Pods(a.Namespace).Delete(oldPods[0].Name, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
// If there are further pods that need updating, we enqueue ourselves again.
|
||||
|
@ -415,6 +400,38 @@ func (c *Operator) syncVersion(am *v1alpha1.Alertmanager) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func AlertmanagerStatus(kclient *kubernetes.Clientset, a *v1alpha1.Alertmanager) (*v1alpha1.AlertmanagerStatus, []v1.Pod, error) {
|
||||
res := &v1alpha1.AlertmanagerStatus{Paused: a.Spec.Paused}
|
||||
|
||||
pods, err := kclient.Core().Pods(a.Namespace).List(ListOptions(a.Name))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
res.Replicas = int32(len(pods.Items))
|
||||
|
||||
var oldPods []v1.Pod
|
||||
for _, pod := range pods.Items {
|
||||
ready, err := k8sutil.PodRunningAndReady(pod)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot determine pod ready state: %s", err)
|
||||
}
|
||||
if ready {
|
||||
res.AvailableReplicas++
|
||||
// TODO(fabxc): detect other fields of the pod template that are mutable.
|
||||
if strings.HasSuffix(pod.Spec.Containers[0].Image, a.Spec.Version) {
|
||||
res.UpdatedReplicas++
|
||||
} else {
|
||||
oldPods = append(oldPods, pod)
|
||||
}
|
||||
continue
|
||||
}
|
||||
res.UnavailableReplicas++
|
||||
}
|
||||
|
||||
return res, oldPods, nil
|
||||
}
|
||||
|
||||
func (c *Operator) destroyAlertmanager(key string) error {
|
||||
obj, exists, err := c.ssetInf.GetStore().GetByKey(key)
|
||||
if err != nil {
|
||||
|
|
|
@ -146,7 +146,8 @@ type ServiceMonitorList struct {
|
|||
type Alertmanager struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
v1.ObjectMeta `json:"metadata,omitempty"`
|
||||
Spec AlertmanagerSpec `json:"spec"`
|
||||
Spec AlertmanagerSpec `json:"spec"`
|
||||
Status *AlertmanagerStatus `json:"status"`
|
||||
}
|
||||
|
||||
type AlertmanagerSpec struct {
|
||||
|
@ -168,6 +169,9 @@ type AlertmanagerSpec struct {
|
|||
// served by Alertmanager. If omitted, relevant URL components will be
|
||||
// derived automatically.
|
||||
ExternalURL string `json:"externalUrl,omitempty"`
|
||||
// If set to true all actions on the underlaying managed objects are not
|
||||
// goint to be performed, except for delete actions.
|
||||
Paused bool `json:"paused"`
|
||||
}
|
||||
|
||||
type AlertmanagerList struct {
|
||||
|
@ -179,6 +183,27 @@ type AlertmanagerList struct {
|
|||
Items []Alertmanager `json:"items"`
|
||||
}
|
||||
|
||||
type AlertmanagerStatus struct {
|
||||
// Represents whether any actions on the underlaying managed objects are
|
||||
// being performed. Only delete actions will be performed.
|
||||
Paused bool `json:"paused"`
|
||||
|
||||
// Total number of non-terminated pods targeted by this Alertmanager
|
||||
// cluster (their labels match the selector).
|
||||
Replicas int32 `json:"replicas"`
|
||||
|
||||
// Total number of non-terminated pods targeted by this Alertmanager
|
||||
// cluster that have the desired version spec.
|
||||
UpdatedReplicas int32 `json:"updatedReplicas"`
|
||||
|
||||
// Total number of available pods (ready for at least minReadySeconds)
|
||||
// targeted by this Alertmanager cluster.
|
||||
AvailableReplicas int32 `json:"availableReplicas"`
|
||||
|
||||
// Total number of unavailable pods targeted by this Alertmanager cluster.
|
||||
UnavailableReplicas int32 `json:"unavailableReplicas"`
|
||||
}
|
||||
|
||||
type NamespaceSelector struct {
|
||||
Any bool `json:"any,omitempty"`
|
||||
MatchNames []string `json:"matchNames,omitempty"`
|
||||
|
|
|
@ -490,7 +490,7 @@ func (c *Operator) syncVersion(key string, p *v1alpha1.Prometheus) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v1alpha1.PrometheusStatus, []*v1.Pod, error) {
|
||||
func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v1alpha1.PrometheusStatus, []v1.Pod, error) {
|
||||
res := &v1alpha1.PrometheusStatus{Paused: p.Spec.Paused}
|
||||
|
||||
pods, err := kclient.Core().Pods(p.Namespace).List(ListOptions(p.Name))
|
||||
|
@ -500,7 +500,7 @@ func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v
|
|||
|
||||
res.Replicas = int32(len(pods.Items))
|
||||
|
||||
var oldPods []*v1.Pod
|
||||
var oldPods []v1.Pod
|
||||
for _, pod := range pods.Items {
|
||||
ready, err := k8sutil.PodRunningAndReady(pod)
|
||||
if err != nil {
|
||||
|
@ -512,7 +512,7 @@ func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v
|
|||
if strings.HasSuffix(pod.Spec.Containers[0].Image, p.Spec.Version) {
|
||||
res.UpdatedReplicas++
|
||||
} else {
|
||||
oldPods = append(oldPods, &pod)
|
||||
oldPods = append(oldPods, pod)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -54,28 +54,28 @@ func TestAlertmanagerScaling(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
//func TestAlertmanagerVersionMigration(t *testing.T) {
|
||||
// name := "alertmanager-test"
|
||||
//
|
||||
// defer func() {
|
||||
// if err := framework.DeleteAlertmanagerAndWaitUntilGone(name); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
// }()
|
||||
//
|
||||
// am := framework.MakeBasicAlertmanager(name, 3)
|
||||
// am.Spec.Version = "v0.5.0"
|
||||
// if err := framework.CreateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// am.Spec.Version = "v0.5.1"
|
||||
// if err := framework.UpdateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// am.Spec.Version = "v0.5.0"
|
||||
// if err := framework.UpdateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
//}
|
||||
func TestAlertmanagerVersionMigration(t *testing.T) {
|
||||
name := "alertmanager-test"
|
||||
|
||||
defer func() {
|
||||
if err := framework.DeleteAlertmanagerAndWaitUntilGone(name); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
am := framework.MakeBasicAlertmanager(name, 3)
|
||||
am.Spec.Version = "v0.5.0"
|
||||
if err := framework.CreateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
am.Spec.Version = "v0.5.1"
|
||||
if err := framework.UpdateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
am.Spec.Version = "v0.5.0"
|
||||
if err := framework.UpdateAlertmanagerAndWaitUntilReady(am); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *v1alpha1.Alertmanager
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = f.WaitForPodsReady(time.Minute*2, int(a.Spec.Replicas), amImage(a.Spec.Version), alertmanager.ListOptions(a.Name))
|
||||
_, err = f.WaitForPodsReady(time.Minute*6, int(a.Spec.Replicas), amImage(a.Spec.Version), alertmanager.ListOptions(a.Name))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create an Alertmanager cluster (%s) with %d instances: %v", a.Name, a.Spec.Replicas, err)
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func (f *Framework) UpdateAlertmanagerAndWaitUntilReady(a *v1alpha1.Alertmanager
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = f.WaitForPodsReady(time.Minute*2, int(a.Spec.Replicas), amImage(a.Spec.Version), alertmanager.ListOptions(a.Name))
|
||||
_, err = f.WaitForPodsReady(time.Minute*6, int(a.Spec.Replicas), amImage(a.Spec.Version), alertmanager.ListOptions(a.Name))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update %d Alertmanager instances (%s): %v", a.Spec.Replicas, a.Name, err)
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func (f *Framework) DeleteAlertmanagerAndWaitUntilGone(name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if _, err := f.WaitForPodsReady(time.Minute*2, 0, amImage(a.Spec.Version), alertmanager.ListOptions(name)); err != nil {
|
||||
if _, err := f.WaitForPodsReady(time.Minute*6, 0, amImage(a.Spec.Version), alertmanager.ListOptions(name)); err != nil {
|
||||
return fmt.Errorf("failed to teardown Alertmanager (%s) instances: %v", name, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ func (f *Framework) CreatePrometheusAndWaitUntilReady(p *v1alpha1.Prometheus) er
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = f.WaitForPodsReady(time.Minute*2, int(p.Spec.Replicas), promImage(p.Spec.Version), prometheus.ListOptions(p.Name))
|
||||
_, err = f.WaitForPodsReady(time.Minute*6, int(p.Spec.Replicas), promImage(p.Spec.Version), prometheus.ListOptions(p.Name))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create %d Prometheus instances (%s): %v", p.Spec.Replicas, p.Name, err)
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ func (f *Framework) UpdatePrometheusAndWaitUntilReady(p *v1alpha1.Prometheus) er
|
|||
return err
|
||||
}
|
||||
|
||||
_, err = f.WaitForPodsReady(time.Minute*2, int(p.Spec.Replicas), promImage(p.Spec.Version), prometheus.ListOptions(p.Name))
|
||||
_, err = f.WaitForPodsReady(time.Minute*6, int(p.Spec.Replicas), promImage(p.Spec.Version), prometheus.ListOptions(p.Name))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update %d Prometheus instances (%s): %v", p.Spec.Replicas, p.Name, err)
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ func (f *Framework) DeletePrometheusAndWaitUntilGone(name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if _, err := f.WaitForPodsReady(time.Minute*2, 0, promImage(p.Spec.Version), prometheus.ListOptions(name)); err != nil {
|
||||
if _, err := f.WaitForPodsReady(time.Minute*6, 0, promImage(p.Spec.Version), prometheus.ListOptions(name)); err != nil {
|
||||
return fmt.Errorf("failed to teardown Prometheus instances (%s): %v", name, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ func TestPrometheusDiscovery(t *testing.T) {
|
|||
}
|
||||
|
||||
p := framework.MakeBasicPrometheus(prometheusName, group, 1)
|
||||
p.Spec.Version = "v1.5.0"
|
||||
p.Spec.Version = "master"
|
||||
if err := framework.CreatePrometheusAndWaitUntilReady(p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ func TestPrometheusAlertmanagerDiscovery(t *testing.T) {
|
|||
|
||||
p := framework.MakeBasicPrometheus(prometheusName, group, 1)
|
||||
framework.AddAlertingToPrometheus(p, alertmanagerName)
|
||||
p.Spec.Version = "v1.5.0"
|
||||
p.Spec.Version = "master"
|
||||
if err := framework.CreatePrometheusAndWaitUntilReady(p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue