1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

Switch deployments to petsets

This commit is contained in:
Fabian Reinartz 2016-10-28 16:08:11 +02:00
parent 4d4c3162d4
commit b5644033fe
2 changed files with 110 additions and 69 deletions

View file

@ -2,7 +2,6 @@ package controller
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/url"
@ -17,12 +16,11 @@ import (
apierrors "k8s.io/client-go/1.5/pkg/api/errors"
"k8s.io/client-go/1.5/pkg/api/unversioned"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/apis/apps/v1alpha1"
extensionsobj "k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/labels"
utilruntime "k8s.io/client-go/1.5/pkg/util/runtime"
"k8s.io/client-go/1.5/pkg/util/wait"
"k8s.io/client-go/1.5/pkg/watch"
"k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
)
@ -42,7 +40,8 @@ type Controller struct {
promInf cache.SharedIndexInformer
smonInf cache.SharedIndexInformer
cmapInf cache.SharedIndexInformer
deplInf cache.SharedIndexInformer
psetInf cache.SharedIndexInformer
svcInf cache.SharedIndexInformer
queue *queue
@ -109,9 +108,13 @@ func (c *Controller) Run(stopc <-chan struct{}) error {
cache.NewListWatchFromClient(c.kclient.Core().GetRESTClient(), "configmaps", api.NamespaceAll, nil),
&v1.ConfigMap{}, resyncPeriod, cache.Indexers{},
)
c.deplInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Extensions().GetRESTClient(), "deployments", api.NamespaceAll, nil),
&extensionsobj.Deployment{}, resyncPeriod, cache.Indexers{},
c.psetInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Apps().GetRESTClient(), "petsets", api.NamespaceAll, nil),
&v1alpha1.PetSet{}, resyncPeriod, cache.Indexers{},
)
c.svcInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Core().GetRESTClient(), "services", api.NamespaceAll, nil),
&v1.Service{}, resyncPeriod, cache.Indexers{},
)
c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -149,28 +152,29 @@ func (c *Controller) Run(stopc <-chan struct{}) error {
c.enqueueAll()
},
})
c.deplInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
c.psetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): only enqueue Prometheus an affected deployment belonged to.
AddFunc: func(d interface{}) {
c.logger.Log("msg", "addDeployment", "trigger", "depl add")
c.addDeployment(d)
c.addPetSet(d)
},
DeleteFunc: func(d interface{}) {
c.logger.Log("msg", "deleteDeployment", "trigger", "depl delete")
c.deleteDeployment(d)
c.deletePetSet(d)
},
UpdateFunc: func(old, cur interface{}) {
c.logger.Log("msg", "updateDeployment", "trigger", "depl update")
c.updateDeployment(old, cur)
c.updatePetSet(old, cur)
},
})
go c.promInf.Run(stopc)
go c.smonInf.Run(stopc)
go c.cmapInf.Run(stopc)
go c.deplInf.Run(stopc)
go c.psetInf.Run(stopc)
go c.svcInf.Run(stopc)
for !c.promInf.HasSynced() || !c.smonInf.HasSynced() || !c.cmapInf.HasSynced() || !c.deplInf.HasSynced() {
for !c.promInf.HasSynced() || !c.smonInf.HasSynced() || !c.cmapInf.HasSynced() || !c.psetInf.HasSynced() || !c.svcInf.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
@ -220,7 +224,7 @@ func (c *Controller) worker() {
}
}
func (c *Controller) prometheusForDeployment(d *v1beta1.Deployment) *spec.Prometheus {
func (c *Controller) prometheusForDeployment(d *v1alpha1.PetSet) *spec.Prometheus {
key, err := keyFunc(d)
if err != nil {
utilruntime.HandleError(fmt.Errorf("creating key: %s", err))
@ -238,25 +242,25 @@ func (c *Controller) prometheusForDeployment(d *v1beta1.Deployment) *spec.Promet
return p.(*spec.Prometheus)
}
func (c *Controller) deleteDeployment(o interface{}) {
d := o.(*v1beta1.Deployment)
func (c *Controller) deletePetSet(o interface{}) {
d := o.(*v1alpha1.PetSet)
// Wake up Prometheus resource the deployment belongs to.
if p := c.prometheusForDeployment(d); p != nil {
c.enqueuePrometheus(p)
}
}
func (c *Controller) addDeployment(o interface{}) {
d := o.(*v1beta1.Deployment)
func (c *Controller) addPetSet(o interface{}) {
d := o.(*v1alpha1.PetSet)
// Wake up Prometheus resource the deployment belongs to.
if p := c.prometheusForDeployment(d); p != nil {
c.enqueuePrometheus(p)
}
}
func (c *Controller) updateDeployment(oldo, curo interface{}) {
old := oldo.(*v1beta1.Deployment)
cur := curo.(*v1beta1.Deployment)
func (c *Controller) updatePetSet(oldo, curo interface{}) {
old := oldo.(*v1alpha1.PetSet)
cur := curo.(*v1alpha1.PetSet)
c.logger.Log("msg", "update handler", "old", old.ResourceVersion, "cur", cur.ResourceVersion)
@ -304,24 +308,42 @@ func (c *Controller) reconcile(p *spec.Prometheus) error {
}
}
deplClient := c.kclient.ExtensionsClient.Deployments(p.Namespace)
svcClient := c.kclient.Core().Services(p.Namespace)
svcQ := &v1.Service{}
svcQ.Namespace = p.Namespace
svcQ.Name = fmt.Sprintf("%s-petset", p.Name)
obj, exists, err := c.svcInf.GetStore().Get(svcQ)
if err != nil {
return err
}
if !exists {
if _, err := svcClient.Create(makePetSetService(p)); err != nil {
return fmt.Errorf("create petset service: %s", err)
}
}
// if _, err = svcClient.Update(makePetSetService(p)); err != nil {
// return err
// }
psetClient := c.kclient.Apps().PetSets(p.Namespace)
// Ensure we have a replica set running Prometheus deployed.
// XXX: Selecting by ObjectMeta.Name gives an error. So use the label for now.
deplQ := &v1beta1.Deployment{}
deplQ.Namespace = p.Namespace
deplQ.Name = p.Name
obj, exists, err := c.deplInf.GetStore().Get(deplQ)
psetQ := &v1alpha1.PetSet{}
psetQ.Namespace = p.Namespace
psetQ.Name = p.Name
obj, exists, err = c.psetInf.GetStore().Get(psetQ)
if err != nil {
return err
}
if !exists {
if _, err := deplClient.Create(makeDeployment(p, nil)); err != nil {
return fmt.Errorf("create deployment: %s", err)
if _, err := psetClient.Create(makePetSet(p, nil)); err != nil {
return fmt.Errorf("create petset: %s", err)
}
return nil
}
if _, err := deplClient.Update(makeDeployment(p, obj.(*v1beta1.Deployment))); err != nil {
if _, err := psetClient.Update(makePetSet(p, obj.(*v1alpha1.PetSet))); err != nil {
return err
}
return nil
@ -329,58 +351,50 @@ func (c *Controller) reconcile(p *spec.Prometheus) error {
func (c *Controller) deletePrometheus(p *spec.Prometheus) error {
// Update the replica count to 0 and wait for all pods to be deleted.
deplClient := c.kclient.ExtensionsClient.Deployments(p.Namespace)
psetClient := c.kclient.Apps().PetSets(p.Namespace)
key, err := keyFunc(p)
if err != nil {
return err
}
oldDepl, _, err := c.deplInf.GetStore().GetByKey(key)
depl, err := deplClient.Update(makeDeployment(p, oldDepl.(*v1beta1.Deployment)))
oldPsetO, _, err := c.psetInf.GetStore().GetByKey(key)
if err != nil {
return err
}
oldPset := oldPsetO.(*v1alpha1.PetSet)
zero := int32(0)
oldPset.Spec.Replicas = &zero
if _, err := psetClient.Update(oldPset); err != nil {
return err
}
// XXX: Selecting by ObjectMeta.Name gives an error. So use the label for now.
selector, err := labels.Parse("prometheus.coreos.com/type=prometheus,prometheus.coreos.com/name=" + p.Name)
if err != nil {
return err
}
w, err := deplClient.Watch(api.ListOptions{LabelSelector: selector})
if err != nil {
return err
}
if _, err := watch.Until(100*time.Second, w, func(e watch.Event) (bool, error) {
curDepl, ok := e.Object.(*extensionsobj.Deployment)
if !ok {
return false, errors.New("not a replica set")
}
// Check if the replica set is scaled down and all replicas are gone.
if curDepl.Status.ObservedGeneration >= depl.Status.ObservedGeneration && curDepl.Status.Replicas == *curDepl.Spec.Replicas {
return true, nil
}
podClient := c.kclient.Core().Pods(p.Namespace)
switch e.Type {
// Deleted before we could validate it was scaled down correctly.
case watch.Deleted:
return true, errors.New("replica set deleted")
case watch.Error:
return false, errors.New("watch error")
// TODO(fabxc): temprorary solution until PetSet status provides necessary info to know
// whether scale-down completed.
for {
pods, err := podClient.List(api.ListOptions{LabelSelector: selector})
if err != nil {
return err
}
return false, nil
}); err != nil {
return err
if len(pods.Items) == 0 {
break
}
time.Sleep(50 * time.Millisecond)
}
// Deployment scaled down, we can delete it.
if err := deplClient.Delete(p.Name, nil); err != nil {
if err := psetClient.Delete(p.Name, nil); err != nil {
return err
}
// Remove ReplicaSet of the deployment.
rsClient := c.kclient.Extensions().ReplicaSets(p.Namespace)
if err := rsClient.DeleteCollection(&api.DeleteOptions{}, api.ListOptions{
LabelSelector: selector,
}); err != nil {
if err := c.kclient.Core().Services(p.Namespace).Delete(fmt.Sprintf("%s-petset", p.Name), nil); err != nil {
return err
}

View file

@ -5,11 +5,11 @@ import (
"github.com/coreos/kube-prometheus-controller/pkg/spec"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/apis/apps/v1alpha1"
"k8s.io/client-go/1.5/pkg/util/intstr"
)
func makeDeployment(p *spec.Prometheus, old *v1beta1.Deployment) *v1beta1.Deployment {
func makePetSet(p *spec.Prometheus, old *v1alpha1.PetSet) *v1alpha1.PetSet {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -28,31 +28,58 @@ func makeDeployment(p *spec.Prometheus, old *v1beta1.Deployment) *v1beta1.Deploy
}
image := fmt.Sprintf("%s:%s", baseImage, version)
depl := &v1beta1.Deployment{
petset := &v1alpha1.PetSet{
ObjectMeta: v1.ObjectMeta{
Name: p.Name,
},
Spec: makeDeploymentSpec(p.Name, image, replicas),
Spec: makePetSetSpec(p.Name, image, replicas),
}
if old != nil {
depl.Annotations = old.Annotations
petset.Annotations = old.Annotations
}
return depl
return petset
}
func makeDeploymentSpec(name, image string, replicas int32) v1beta1.DeploymentSpec {
func makePetSetService(p *spec.Prometheus) *v1.Service {
svc := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("%s-petset", p.Name),
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"prometheus.coreos.com/name": p.Name,
"prometheus.coreos.com/type": "prometheus",
},
ClusterIP: "None",
Ports: []v1.ServicePort{
{
Name: "web",
Port: 9090,
TargetPort: intstr.FromString("web"),
},
},
},
}
return svc
}
func makePetSetSpec(name, image string, replicas int32) v1alpha1.PetSetSpec {
// Prometheus may take quite long to shut down to checkpoint existing data.
// Allow up to 10 minutes for clean termination.
terminationGracePeriod := int64(600)
return v1beta1.DeploymentSpec{
Replicas: &replicas,
return v1alpha1.PetSetSpec{
ServiceName: fmt.Sprintf("%s-petset", name),
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"prometheus.coreos.com/name": name,
"prometheus.coreos.com/type": "prometheus",
},
Annotations: map[string]string{
"pod.alpha.kubernetes.io/initialized": "true",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{