1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

Properly filter deployment updates, fix update resource

This commit is contained in:
Fabian Reinartz 2016-10-27 19:08:25 +02:00
parent 83e3bcaab7
commit 75b2a1886f
3 changed files with 187 additions and 100 deletions

View file

@ -114,6 +114,57 @@ func (c *Controller) Run(stopc <-chan struct{}) error {
&extensionsobj.Deployment{}, resyncPeriod, cache.Indexers{},
)
c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(p interface{}) {
c.logger.Log("msg", "enqueuePrometheus", "trigger", "prom add")
c.enqueuePrometheus(p)
},
DeleteFunc: func(p interface{}) {
c.logger.Log("msg", "enqueuePrometheus", "trigger", "prom del")
c.enqueuePrometheus(p)
},
UpdateFunc: func(_, p interface{}) {
c.logger.Log("msg", "enqueuePrometheus", "trigger", "prom update")
c.enqueuePrometheus(p)
},
})
c.smonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
c.logger.Log("msg", "enqueueAll", "trigger", "smon add")
c.enqueueAll()
},
DeleteFunc: func(_ interface{}) {
c.logger.Log("msg", "enqueueAll", "trigger", "smon del")
c.enqueueAll()
},
UpdateFunc: func(_, _ interface{}) {
c.logger.Log("msg", "enqueueAll", "trigger", "smon update")
c.enqueueAll()
},
})
c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): only enqueue Prometheus the ConfigMap belonged to.
DeleteFunc: func(_ interface{}) {
c.logger.Log("msg", "enqueueAll", "trigger", "cmap del")
c.enqueueAll()
},
})
c.deplInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): only enqueue Prometheus an affected deployment belonged to.
AddFunc: func(d interface{}) {
c.logger.Log("msg", "addDeployment", "trigger", "depl add")
c.addDeployment(d)
},
DeleteFunc: func(d interface{}) {
c.logger.Log("msg", "deleteDeployment", "trigger", "depl delete")
c.deleteDeployment(d)
},
UpdateFunc: func(old, cur interface{}) {
c.logger.Log("msg", "updateDeployment", "trigger", "depl update")
c.updateDeployment(old, cur)
},
})
go c.promInf.Run(stopc)
go c.smonInf.Run(stopc)
go c.cmapInf.Run(stopc)
@ -123,36 +174,6 @@ func (c *Controller) Run(stopc <-chan struct{}) error {
time.Sleep(100 * time.Millisecond)
}
c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(p interface{}) {
c.logger.Log("msg", "prominf add")
c.enqueuePrometheus(p)
},
DeleteFunc: func(p interface{}) {
c.logger.Log("msg", "prominf del")
c.enqueuePrometheus(p)
},
UpdateFunc: func(_, p interface{}) {
c.logger.Log("msg", "prominf up")
c.enqueuePrometheus(p)
},
})
c.smonInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { c.enqueueAll() },
DeleteFunc: func(_ interface{}) { c.enqueueAll() },
UpdateFunc: func(_, _ interface{}) { c.enqueueAll() },
})
c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): only enqueue Prometheus the ConfigMap belonged to.
DeleteFunc: func(_ interface{}) { c.enqueueAll() },
})
c.deplInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): only enqueue Prometheus an affected deployment belonged to.
AddFunc: func(_ interface{}) { c.enqueueAll() },
DeleteFunc: func(_ interface{}) { c.enqueueAll() },
UpdateFunc: func(_, _ interface{}) { c.enqueueAll() },
})
<-stopc
return nil
}
@ -199,6 +220,58 @@ func (c *Controller) worker() {
}
}
func (c *Controller) prometheusForDeployment(d *v1beta1.Deployment) *spec.Prometheus {
key, err := keyFunc(d)
if err != nil {
utilruntime.HandleError(fmt.Errorf("creating key: %s", err))
return nil
}
// Namespace/Name are one-to-one so the key will find the respective Prometheus resource.
p, exists, err := c.promInf.GetStore().GetByKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("get Prometheus resource: %s", err))
return nil
}
if !exists {
return nil
}
return p.(*spec.Prometheus)
}
func (c *Controller) deleteDeployment(o interface{}) {
d := o.(*v1beta1.Deployment)
// Wake up Prometheus resource the deployment belongs to.
if p := c.prometheusForDeployment(d); p != nil {
c.enqueuePrometheus(p)
}
}
func (c *Controller) addDeployment(o interface{}) {
d := o.(*v1beta1.Deployment)
// Wake up Prometheus resource the deployment belongs to.
if p := c.prometheusForDeployment(d); p != nil {
c.enqueuePrometheus(p)
}
}
func (c *Controller) updateDeployment(oldo, curo interface{}) {
old := oldo.(*v1beta1.Deployment)
cur := curo.(*v1beta1.Deployment)
c.logger.Log("msg", "update handler", "old", old.ResourceVersion, "cur", cur.ResourceVersion)
// Periodic resync may resend the deployment without changes in-between.
// Also breaks loops created by updating the resource ourselves.
if old.ResourceVersion == cur.ResourceVersion {
return
}
// Wake up Prometheus resource the deployment belongs to.
if p := c.prometheusForDeployment(cur); p != nil {
c.enqueuePrometheus(p)
}
}
func (c *Controller) reconcile(p *spec.Prometheus) error {
key, err := keyFunc(p)
if err != nil {
@ -222,28 +295,29 @@ func (c *Controller) reconcile(p *spec.Prometheus) error {
return c.deletePrometheus(p)
}
// We just always regenerate the configuration to be safe.
if err := c.createConfig(p); err != nil {
return err
}
deplClient := c.kclient.ExtensionsClient.Deployments(p.Namespace)
// Ensure we have a replica set running Prometheus deployed.
// XXX: Selecting by ObjectMeta.Name gives an error. So use the label for now.
deplQ := &v1beta1.Deployment{}
deplQ.Namespace = p.Namespace
deplQ.Name = p.Name
_, exists, err = c.deplInf.GetStore().Get(deplQ)
obj, exists, err := c.deplInf.GetStore().Get(deplQ)
if err != nil {
return err
}
deplClient := c.kclient.ExtensionsClient.Deployments(p.Namespace)
if !exists {
if _, err := deplClient.Create(makeDeployment(p)); err != nil {
if _, err := deplClient.Create(makeDeployment(p, nil)); err != nil {
return fmt.Errorf("create deployment: %s", err)
}
} else if _, err := deplClient.Update(makeDeployment(p)); err != nil {
return fmt.Errorf("update deployment: %s", err)
return nil
}
// We just always regenerate the configuration to be safe.
if err := c.createConfig(p); err != nil {
if _, err := deplClient.Update(makeDeployment(p, obj.(*v1beta1.Deployment))); err != nil {
return err
}
return nil
@ -252,8 +326,13 @@ func (c *Controller) reconcile(p *spec.Prometheus) error {
func (c *Controller) deletePrometheus(p *spec.Prometheus) error {
// Update the replica count to 0 and wait for all pods to be deleted.
deplClient := c.kclient.ExtensionsClient.Deployments(p.Namespace)
// depl := c.kclient.Extensions().Deployments(p.Namespace)
depl, err := deplClient.Update(makeDeployment(p))
key, err := keyFunc(p)
if err != nil {
return err
}
oldDepl, _, err := c.deplInf.GetStore().GetByKey(key)
depl, err := deplClient.Update(makeDeployment(p, oldDepl.(*v1beta1.Deployment)))
if err != nil {
return err
}

View file

@ -8,7 +8,7 @@ import (
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
)
func makeDeployment(p *spec.Prometheus) *v1beta1.Deployment {
func makeDeployment(p *spec.Prometheus, old *v1beta1.Deployment) *v1beta1.Deployment {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -25,69 +25,77 @@ func makeDeployment(p *spec.Prometheus) *v1beta1.Deployment {
if replicas < 1 {
replicas = 1
}
fmt.Println("creating deployment with replicas %q %q", p.Spec.Replicas, replicas)
image := fmt.Sprintf("%s:%s", baseImage, version)
depl := &v1beta1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: p.Name,
},
Spec: v1beta1.DeploymentSpec{
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"prometheus.coreos.com/name": p.Name,
"prometheus.coreos.com/type": "prometheus",
},
Spec: makeDeploymentSpec(p.Name, image, replicas),
}
if old != nil {
depl.Annotations = old.Annotations
}
return depl
}
func makeDeploymentSpec(name, image string, replicas int32) v1beta1.DeploymentSpec {
return v1beta1.DeploymentSpec{
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"prometheus.coreos.com/name": name,
"prometheus.coreos.com/type": "prometheus",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "prometheus",
Image: fmt.Sprintf("%s:%s", baseImage, version),
Ports: []v1.ContainerPort{
{
Name: "web",
ContainerPort: 9090,
Protocol: v1.ProtocolTCP,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "prometheus",
Image: image,
Ports: []v1.ContainerPort{
{
Name: "web",
ContainerPort: 9090,
Protocol: v1.ProtocolTCP,
},
Args: []string{
"-storage.local.retention=12h",
"-storage.local.memory-chunks=500000",
"-config.file=/etc/prometheus/prometheus.yaml",
},
Args: []string{
"-storage.local.retention=12h",
"-storage.local.memory-chunks=500000",
"-config.file=/etc/prometheus/prometheus.yaml",
},
VolumeMounts: []v1.VolumeMount{
{
Name: "config-volume",
ReadOnly: true,
MountPath: "/etc/prometheus",
},
VolumeMounts: []v1.VolumeMount{
{
Name: "config-volume",
ReadOnly: true,
MountPath: "/etc/prometheus",
},
},
}, {
Name: "reloader",
Image: "jimmidyson/configmap-reload",
Args: []string{
"-webhook-url=http://localhost:9090/-/reload",
"-volume-dir=/etc/prometheus/",
},
VolumeMounts: []v1.VolumeMount{
{
Name: "config-volume",
ReadOnly: true,
MountPath: "/etc/prometheus",
},
},
}, {
Name: "reloader",
Image: "jimmidyson/configmap-reload",
Args: []string{
"-webhook-url=http://localhost:9090/-/reload",
"-volume-dir=/etc/prometheus/",
},
VolumeMounts: []v1.VolumeMount{
{
Name: "config-volume",
ReadOnly: true,
MountPath: "/etc/prometheus",
},
},
},
Volumes: []v1.Volume{
{
Name: "config-volume",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: p.Name,
},
},
Volumes: []v1.Volume{
{
Name: "config-volume",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: name,
},
},
},
@ -96,5 +104,4 @@ func makeDeployment(p *spec.Prometheus) *v1beta1.Deployment {
},
},
}
return depl
}

View file

@ -23,11 +23,12 @@ type PrometheusList struct {
// PrometheusSpec holds specification parameters of a Prometheus deployment.
type PrometheusSpec struct {
ServiceMonitors []ServiceMonitorSelection `json:"serviceMonitors"`
EvaluationInterval string `json:"evaluationInterval"`
Version string `json:"version"`
BaseImage string `json:"baseImage"`
Replicas int32 `json:"replicas"`
ServiceMonitors []ServiceMonitorSelection `json:"serviceMonitors"`
EvaluationInterval string `json:"evaluationInterval"`
Version string `json:"version"`
BaseImage string `json:"baseImage"`
Replicas int32 `json:"replicas"`
StorageVolumes v1.PersistentVolumeClaimVolumeSource `json:"storageVolumes"`
// Retention string `json:"retention"`
// Replicas int `json:"replicas"`
// Resources apiV1.ResourceRequirements `json:"resources"`