1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 03:38:43 +00:00

*: use versioned monitoring api

This commit is contained in:
Frederic Branczyk 2017-01-06 18:00:01 +01:00
parent 18a5ba18e4
commit 77a2dc4f39
No known key found for this signature in database
GPG key ID: CA14788B1E48B256
11 changed files with 98 additions and 462 deletions

View file

@ -1,86 +0,0 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alertmanager
import (
"encoding/json"
"time"
"github.com/coreos/prometheus-operator/pkg/spec"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const resyncPeriod = 5 * time.Minute
type alertmanagerDecoder struct {
dec *json.Decoder
close func() error
}
func (d *alertmanagerDecoder) Close() {
d.close()
}
func (d *alertmanagerDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Alertmanager
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewAlertmanagerListWatch returns a new ListWatch on the Alertmanager resource.
func NewAlertmanagerListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(api.NamespaceAll).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var p spec.AlertmanagerList
return &p, json.Unmarshal(b, &p)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(api.NamespaceAll).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&alertmanagerDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}

View file

@ -20,10 +20,10 @@ import (
"time"
"github.com/coreos/prometheus-operator/pkg/analytics"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
@ -36,24 +36,20 @@ import (
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
utilruntime "k8s.io/client-go/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const (
TPRGroup = "monitoring.coreos.com"
TPRVersion = "v1alpha1"
tprAlertmanager = "alertmanager." + v1alpha1.TPRGroup
TPRAlertmanagersKind = "alertmanagers"
tprAlertmanager = "alertmanager." + TPRGroup
resyncPeriod = 5 * time.Minute
)
// Operator manages lify cycle of Alertmanager deployments and
// monitoring configurations.
type Operator struct {
kclient *kubernetes.Clientset
pclient *rest.RESTClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
alrtInf cache.SharedIndexInformer
@ -75,14 +71,14 @@ func New(c prometheus.Config, logger log.Logger) (*Operator, error) {
return nil, err
}
promclient, err := prometheus.NewPrometheusRESTClient(*cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &Operator{
kclient: client,
pclient: promclient,
mclient: mclient,
logger: logger,
queue: queue.New(),
host: cfg.Host,
@ -121,8 +117,11 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
}
c.alrtInf = cache.NewSharedIndexInformer(
NewAlertmanagerListWatch(c.pclient),
&spec.Alertmanager{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: c.mclient.Alertmanagers(api.NamespaceAll).List,
WatchFunc: c.mclient.Alertmanagers(api.NamespaceAll).Watch,
},
&v1alpha1.Alertmanager{}, resyncPeriod, cache.Indexers{},
)
c.ssetInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Apps().RESTClient(), "statefulsets", api.NamespaceAll, nil),
@ -191,7 +190,7 @@ func (c *Operator) enqueue(obj interface{}) {
// enqueueForNamespace enqueues all Alertmanager object keys that belong to the given namespace.
func (c *Operator) enqueueForNamespace(ns string) {
cache.ListAll(c.alrtInf.GetStore(), labels.Everything(), func(obj interface{}) {
am := obj.(*spec.Alertmanager)
am := obj.(*v1alpha1.Alertmanager)
if am.Namespace == ns {
c.enqueue(am)
}
@ -223,7 +222,7 @@ func (c *Operator) worker() {
}
}
func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *spec.Alertmanager {
func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *v1alpha1.Alertmanager {
key, ok := c.keyFunc(ps)
if !ok {
return nil
@ -237,7 +236,7 @@ func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *spec.Alertmanager
if !exists {
return nil
}
return a.(*spec.Alertmanager)
return a.(*v1alpha1.Alertmanager)
}
func (c *Operator) handleAlertmanagerAdd(obj interface{}) {
@ -319,7 +318,7 @@ func (c *Operator) sync(key string) error {
return c.destroyAlertmanager(key)
}
am := obj.(*spec.Alertmanager)
am := obj.(*v1alpha1.Alertmanager)
c.logger.Log("msg", "sync alertmanager", "key", key)
@ -363,7 +362,7 @@ func ListOptions(name string) v1.ListOptions {
// create new pods.
//
// TODO(fabxc): remove this once the StatefulSet controller learns how to do rolling updates.
func (c *Operator) syncVersion(am *spec.Alertmanager) error {
func (c *Operator) syncVersion(am *v1alpha1.Alertmanager) error {
podClient := c.kclient.Core().Pods(am.Namespace)
pods, err := podClient.List(ListOptions(am.Name))
@ -464,7 +463,7 @@ func (c *Operator) createTPRs() error {
Name: tprAlertmanager,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Managed Alertmanager cluster",
},
@ -479,5 +478,5 @@ func (c *Operator) createTPRs() error {
}
// We have to wait for the TPRs to be ready. Otherwise the initial watch may fail.
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRAlertmanagersKind)
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRAlertmanagerName)
}

View file

@ -24,10 +24,10 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
func makeStatefulSet(am *spec.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
func makeStatefulSet(am *v1alpha1.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -80,7 +80,7 @@ func makeStatefulSet(am *spec.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.S
return statefulset
}
func makeStatefulSetService(p *spec.Alertmanager) *v1.Service {
func makeStatefulSetService(p *v1alpha1.Alertmanager) *v1.Service {
svc := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "alertmanager",
@ -109,7 +109,7 @@ func makeStatefulSetService(p *spec.Alertmanager) *v1.Service {
return svc
}
func makeStatefulSetSpec(a *spec.Alertmanager) v1beta1.StatefulSetSpec {
func makeStatefulSetSpec(a *v1alpha1.Alertmanager) v1beta1.StatefulSetSpec {
image := fmt.Sprintf("%s:%s", a.Spec.BaseImage, a.Spec.Version)
commands := []string{
@ -219,7 +219,7 @@ func makeStatefulSetSpec(a *spec.Alertmanager) v1beta1.StatefulSetSpec {
}
}
func subPathForStorage(s *spec.StorageSpec) string {
func subPathForStorage(s *v1alpha1.StorageSpec) string {
if s == nil {
return ""
}

View file

@ -22,13 +22,14 @@ import (
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
)
type API struct {
kclient *kubernetes.Clientset
pclient *prometheus.MonitoringClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
}
@ -43,14 +44,14 @@ func New(conf prometheus.Config, l log.Logger) (*API, error) {
return nil, err
}
pclient, err := prometheus.NewForConfig(cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &API{
kclient: kclient,
pclient: pclient,
mclient: mclient,
logger: l,
}, nil
}
@ -94,7 +95,7 @@ func parsePrometheusStatusUrl(path string) objectReference {
func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
or := parsePrometheusStatusUrl(req.URL.Path)
p, err := api.pclient.Prometheuses(or.namespace).Get(or.name)
p, err := api.mclient.Prometheuses(or.namespace).Get(or.name)
if err != nil {
if k8sutil.IsResourceNotFoundError(err) {
w.WriteHeader(404)

View file

@ -1,204 +0,0 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"encoding/json"
"time"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"github.com/coreos/prometheus-operator/pkg/spec"
)
const resyncPeriod = 5 * time.Minute
type MonitoringClient struct {
*rest.RESTClient
}
func (c *MonitoringClient) Prometheuses(namespace string) PrometheusInterface {
return newPrometheuses(c, namespace)
}
func NewForConfig(c *rest.Config) (*MonitoringClient, error) {
client, err := NewPrometheusRESTClient(*c)
if err != nil {
return nil, err
}
return &MonitoringClient{client}, nil
}
func NewPrometheusRESTClient(c rest.Config) (*rest.RESTClient, error) {
c.APIPath = "/apis"
c.GroupVersion = &schema.GroupVersion{
Group: "monitoring.coreos.com",
Version: "v1alpha1",
}
// TODO(fabxc): is this even used with our custom list/watch functions?
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return rest.RESTClientFor(&c)
}
type PrometheusInterface interface {
Get(name string) (*spec.Prometheus, error)
}
type prometheuses struct {
client *MonitoringClient
ns string
}
func newPrometheuses(c *MonitoringClient, namespace string) *prometheuses {
return &prometheuses{
client: c,
ns: namespace,
}
}
func (p *prometheuses) Get(name string) (result *spec.Prometheus, err error) {
result = &spec.Prometheus{}
req := p.client.Get().
Namespace(p.ns).
Resource("prometheuses").
Name(name)
body, err := req.DoRaw()
if err != nil {
return nil, err
}
err = json.Unmarshal(body, &result)
return
}
type prometheusDecoder struct {
dec *json.Decoder
close func() error
}
func (d *prometheusDecoder) Close() {
d.close()
}
func (d *prometheusDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Prometheus
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewPrometheusListWatch returns a new ListWatch on the Prometheus resource.
func NewPrometheusListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(v1.NamespaceAll).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var p spec.PrometheusList
return &p, json.Unmarshal(b, &p)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(v1.NamespaceAll).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&prometheusDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}
type serviceMonitorDecoder struct {
dec *json.Decoder
close func() error
}
func (d *serviceMonitorDecoder) Close() {
d.close()
}
func (d *serviceMonitorDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.ServiceMonitor
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewServiceMonitorListWatch returns a new ListWatch on the ServiceMonitor resource.
func NewServiceMonitorListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(v1.NamespaceAll).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var sm spec.ServiceMonitorList
return &sm, json.Unmarshal(b, &sm)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(v1.NamespaceAll).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&serviceMonitorDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}

View file

@ -20,9 +20,9 @@ import (
"time"
"github.com/coreos/prometheus-operator/pkg/analytics"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
@ -41,21 +41,17 @@ import (
)
const (
TPRGroup = "monitoring.coreos.com"
TPRVersion = "v1alpha1"
tprServiceMonitor = "service-monitor." + v1alpha1.TPRGroup
tprPrometheus = "prometheus." + v1alpha1.TPRGroup
TPRPrometheusesKind = "prometheuses"
TPRServiceMonitorsKind = "servicemonitors"
tprServiceMonitor = "service-monitor." + TPRGroup
tprPrometheus = "prometheus." + TPRGroup
resyncPeriod = 5 * time.Minute
)
// Operator manages lify cycle of Prometheus deployments and
// monitoring configurations.
type Operator struct {
kclient *kubernetes.Clientset
pclient *rest.RESTClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
promInf cache.SharedIndexInformer
@ -86,25 +82,31 @@ func New(conf Config, logger log.Logger) (*Operator, error) {
return nil, err
}
promclient, err := NewPrometheusRESTClient(*cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
c := &Operator{
kclient: client,
pclient: promclient,
mclient: mclient,
logger: logger,
queue: queue.New(),
host: cfg.Host,
}
c.promInf = cache.NewSharedIndexInformer(
NewPrometheusListWatch(c.pclient),
&spec.Prometheus{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: mclient.Prometheuses(api.NamespaceAll).List,
WatchFunc: mclient.Prometheuses(api.NamespaceAll).Watch,
},
&v1alpha1.Prometheus{}, resyncPeriod, cache.Indexers{},
)
c.smonInf = cache.NewSharedIndexInformer(
NewServiceMonitorListWatch(c.pclient),
&spec.ServiceMonitor{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: mclient.ServiceMonitors(api.NamespaceAll).List,
WatchFunc: mclient.ServiceMonitors(api.NamespaceAll).Watch,
},
&v1alpha1.ServiceMonitor{}, resyncPeriod, cache.Indexers{},
)
c.cmapInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Core().RESTClient(), "configmaps", api.NamespaceAll, nil),
@ -296,7 +298,7 @@ func (c *Operator) enqueue(obj interface{}) {
// enqueueForNamespace enqueues all Prometheus object keys that belong to the given namespace.
func (c *Operator) enqueueForNamespace(ns string) {
cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
p := obj.(*spec.Prometheus)
p := obj.(*v1alpha1.Prometheus)
if p.Namespace == ns {
c.enqueue(p)
}
@ -328,7 +330,7 @@ func (c *Operator) worker() {
}
}
func (c *Operator) prometheusForStatefulSet(ps interface{}) *spec.Prometheus {
func (c *Operator) prometheusForStatefulSet(ps interface{}) *v1alpha1.Prometheus {
key, ok := c.keyFunc(ps)
if !ok {
return nil
@ -342,7 +344,7 @@ func (c *Operator) prometheusForStatefulSet(ps interface{}) *spec.Prometheus {
if !exists {
return nil
}
return p.(*spec.Prometheus)
return p.(*v1alpha1.Prometheus)
}
func (c *Operator) handleDeleteStatefulSet(obj interface{}) {
@ -391,7 +393,7 @@ func (c *Operator) sync(key string) error {
return c.destroyPrometheus(key)
}
p := obj.(*spec.Prometheus)
p := obj.(*v1alpha1.Prometheus)
if p.Spec.Paused {
return nil
}
@ -456,7 +458,7 @@ func ListOptions(name string) v1.ListOptions {
// create new pods.
//
// TODO(fabxc): remove this once the StatefulSet controller learns how to do rolling updates.
func (c *Operator) syncVersion(key string, p *spec.Prometheus) error {
func (c *Operator) syncVersion(key string, p *v1alpha1.Prometheus) error {
status, oldPods, err := PrometheusStatus(c.kclient, p)
if err != nil {
return err
@ -488,8 +490,8 @@ func (c *Operator) syncVersion(key string, p *spec.Prometheus) error {
return nil
}
func PrometheusStatus(kclient *kubernetes.Clientset, p *spec.Prometheus) (*spec.PrometheusStatus, []*v1.Pod, error) {
res := &spec.PrometheusStatus{}
func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v1alpha1.PrometheusStatus, []*v1.Pod, error) {
res := &v1alpha1.PrometheusStatus{}
pods, err := kclient.Core().Pods(p.Namespace).List(ListOptions(p.Name))
if err != nil {
@ -572,7 +574,7 @@ func (c *Operator) destroyPrometheus(key string) error {
return nil
}
func (c *Operator) createConfig(p *spec.Prometheus) error {
func (c *Operator) createConfig(p *v1alpha1.Prometheus) error {
smons, err := c.selectServiceMonitors(p)
if err != nil {
return err
@ -603,9 +605,9 @@ func (c *Operator) createConfig(p *spec.Prometheus) error {
return err
}
func (c *Operator) selectServiceMonitors(p *spec.Prometheus) (map[string]*spec.ServiceMonitor, error) {
func (c *Operator) selectServiceMonitors(p *v1alpha1.Prometheus) (map[string]*v1alpha1.ServiceMonitor, error) {
// Selectors might overlap. Deduplicate them along the keyFunc.
res := make(map[string]*spec.ServiceMonitor)
res := make(map[string]*v1alpha1.ServiceMonitor)
selector, err := metav1.LabelSelectorAsSelector(p.Spec.ServiceMonitorSelector)
if err != nil {
@ -617,7 +619,7 @@ func (c *Operator) selectServiceMonitors(p *spec.Prometheus) (map[string]*spec.S
cache.ListAllByNamespace(c.smonInf.GetIndexer(), p.Namespace, selector, func(obj interface{}) {
k, ok := c.keyFunc(obj)
if ok {
res[k] = obj.(*spec.ServiceMonitor)
res[k] = obj.(*v1alpha1.ServiceMonitor)
}
})
@ -631,7 +633,7 @@ func (c *Operator) createTPRs() error {
Name: tprServiceMonitor,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Prometheus monitoring for a service",
},
@ -640,7 +642,7 @@ func (c *Operator) createTPRs() error {
Name: tprPrometheus,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Managed Prometheus server",
},
@ -655,9 +657,9 @@ func (c *Operator) createTPRs() error {
}
// We have to wait for the TPRs to be ready. Otherwise the initial watch may fail.
err := k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRPrometheusesKind)
err := k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRPrometheusName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRServiceMonitorsKind)
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRServiceMonitorName)
}

View file

@ -22,7 +22,7 @@ import (
yaml "gopkg.in/yaml.v2"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
var (
@ -33,7 +33,7 @@ func sanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}
func generateConfig(p *spec.Prometheus, mons map[string]*spec.ServiceMonitor) ([]byte, error) {
func generateConfig(p *v1alpha1.Prometheus, mons map[string]*v1alpha1.ServiceMonitor) ([]byte, error) {
cfg := map[string]interface{}{}
cfg["global"] = map[string]string{
@ -62,7 +62,7 @@ func generateConfig(p *spec.Prometheus, mons map[string]*spec.ServiceMonitor) ([
return yaml.Marshal(cfg)
}
func generateServiceMonitorConfig(m *spec.ServiceMonitor, ep spec.Endpoint, i int) interface{} {
func generateServiceMonitorConfig(m *v1alpha1.ServiceMonitor, ep v1alpha1.Endpoint, i int) interface{} {
cfg := map[string]interface{}{
"job_name": fmt.Sprintf("%s/%s/%d", m.Namespace, m.Name, i),
"kubernetes_sd_configs": []map[string]interface{}{
@ -238,7 +238,7 @@ func generateServiceMonitorConfig(m *spec.ServiceMonitor, ep spec.Endpoint, i in
return cfg
}
func generateAlertmanagerConfig(am spec.AlertmanagerEndpoints) interface{} {
func generateAlertmanagerConfig(am v1alpha1.AlertmanagerEndpoints) interface{} {
if am.Scheme == "" {
am.Scheme = "http"
}

View file

@ -24,10 +24,10 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
func makeStatefulSet(p spec.Prometheus, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
func makeStatefulSet(p v1alpha1.Prometheus, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -109,7 +109,7 @@ func makeEmptyRules(name string) *v1.ConfigMap {
}
}
func makeStatefulSetService(p *spec.Prometheus) *v1.Service {
func makeStatefulSetService(p *v1alpha1.Prometheus) *v1.Service {
svc := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "prometheus",
@ -131,7 +131,7 @@ func makeStatefulSetService(p *spec.Prometheus) *v1.Service {
return svc
}
func makeStatefulSetSpec(p spec.Prometheus) v1beta1.StatefulSetSpec {
func makeStatefulSetSpec(p v1alpha1.Prometheus) v1beta1.StatefulSetSpec {
// Prometheus may take quite long to shut down to checkpoint existing data.
// Allow up to 10 minutes for clean termination.
terminationGracePeriod := int64(600)
@ -291,7 +291,7 @@ func makeStatefulSetSpec(p spec.Prometheus) v1beta1.StatefulSetSpec {
}
}
func subPathForStorage(s *spec.StorageSpec) string {
func subPathForStorage(s *v1alpha1.StorageSpec) string {
if s == nil {
return ""
}

View file

@ -15,17 +15,13 @@
package framework
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/util/yaml"
"github.com/coreos/prometheus-operator/pkg/alertmanager"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
var ValidAlertmanagerConfig = `global:
@ -42,58 +38,18 @@ receivers:
- url: 'http://alertmanagerwh:30500/'
`
func (f *Framework) CreateAlertmanager(e *spec.Alertmanager) (*spec.Alertmanager, error) {
b, err := json.Marshal(e)
if err != nil {
return nil, err
}
resp, err := f.HTTPClient.Post(
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/alertmanagers", f.MasterHost, f.Namespace.Name),
"application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status: %v", resp.Status)
}
decoder := yaml.NewYAMLOrJSONDecoder(resp.Body, 100)
res := &spec.Alertmanager{}
if err := decoder.Decode(res); err != nil {
return nil, err
}
return res, nil
}
func (f *Framework) MakeBasicAlertmanager(name string, replicas int32) *spec.Alertmanager {
return &spec.Alertmanager{
func (f *Framework) MakeBasicAlertmanager(name string, replicas int32) *v1alpha1.Alertmanager {
return &v1alpha1.Alertmanager{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: spec.AlertmanagerSpec{
Spec: v1alpha1.AlertmanagerSpec{
Replicas: replicas,
},
}
}
func (f *Framework) DeleteAlertmanager(name string) error {
req, err := http.NewRequest("DELETE",
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/alertmanagers/%s", f.MasterHost, f.Namespace.Name, name), nil)
if err != nil {
return err
}
resp, err := f.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %v", resp.Status)
}
return nil
}
func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *spec.Alertmanager) error {
func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *v1alpha1.Alertmanager) error {
_, err := f.KubeClient.CoreV1().ConfigMaps(f.Namespace.Name).Create(
&v1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
@ -108,7 +64,7 @@ func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *spec.Alertmanager) er
return err
}
_, err = f.CreateAlertmanager(a)
_, err = f.MonClient.Alertmanagers(f.Namespace.Name).Create(a)
if err != nil {
return err
}
@ -121,7 +77,7 @@ func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *spec.Alertmanager) er
}
func (f *Framework) DeleteAlertmanagerAndWaitUntilGone(name string) error {
if err := f.DeleteAlertmanager(name); err != nil {
if err := f.MonClient.Alertmanagers(f.Namespace.Name).Delete(name, nil); err != nil {
return err
}

View file

@ -30,12 +30,13 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
)
type Framework struct {
KubeClient kubernetes.Interface
MonClient *v1alpha1.MonitoringV1alpha1Client
HTTPClient *http.Client
MasterHost string
Namespace *v1.Namespace
@ -59,6 +60,11 @@ func New(ns, kubeconfig, opImage string) (*Framework, error) {
return nil, err
}
mclient, err := v1alpha1.NewForConfig(config)
if err != nil {
return nil, err
}
namespace, err := cli.Core().Namespaces().Create(&v1.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: ns,
@ -71,6 +77,7 @@ func New(ns, kubeconfig, opImage string) (*Framework, error) {
f := &Framework{
MasterHost: config.Host,
KubeClient: cli,
MonClient: mclient,
HTTPClient: httpc,
Namespace: namespace,
}
@ -123,12 +130,17 @@ func (f *Framework) setupPrometheusOperator(opImage string) error {
}
f.OperatorPod = &pl.Items[0]
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), prometheus.TPRGroup, prometheus.TPRVersion, prometheus.TPRPrometheusesKind)
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRPrometheusName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), prometheus.TPRGroup, prometheus.TPRVersion, prometheus.TPRServiceMonitorsKind)
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRServiceMonitorName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRAlertmanagerName)
}
// Teardown tears down a previously initialized test environment.

View file

@ -15,72 +15,28 @@
package framework
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/util/yaml"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/spec"
)
func (f *Framework) CreatePrometheus(e *spec.Prometheus) (*spec.Prometheus, error) {
b, err := json.Marshal(e)
if err != nil {
return nil, err
}
resp, err := f.HTTPClient.Post(
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/prometheuses", f.MasterHost, f.Namespace.Name),
"application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status: %v", resp.Status)
}
decoder := yaml.NewYAMLOrJSONDecoder(resp.Body, 100)
res := &spec.Prometheus{}
if err := decoder.Decode(res); err != nil {
return nil, err
}
return res, nil
}
func (f *Framework) DeletePrometheus(name string) error {
req, err := http.NewRequest("DELETE",
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/prometheuses/%s", f.MasterHost, f.Namespace.Name, name), nil)
if err != nil {
return err
}
resp, err := f.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %v", resp.Status)
}
return nil
}
func (f *Framework) MakeBasicPrometheus(name string, replicas int32) *spec.Prometheus {
return &spec.Prometheus{
func (f *Framework) MakeBasicPrometheus(name string, replicas int32) *v1alpha1.Prometheus {
return &v1alpha1.Prometheus{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: spec.PrometheusSpec{
Spec: v1alpha1.PrometheusSpec{
Replicas: replicas,
},
}
}
func (f *Framework) CreatePrometheusAndWaitUntilReady(p *spec.Prometheus) error {
_, err := f.CreatePrometheus(p)
func (f *Framework) CreatePrometheusAndWaitUntilReady(p *v1alpha1.Prometheus) error {
_, err := f.MonClient.Prometheuses(f.Namespace.Name).Create(p)
if err != nil {
return err
}
@ -94,7 +50,7 @@ func (f *Framework) CreatePrometheusAndWaitUntilReady(p *spec.Prometheus) error
}
func (f *Framework) DeletePrometheusAndWaitUntilGone(name string) error {
if err := f.DeletePrometheus(name); err != nil {
if err := f.MonClient.Prometheuses(f.Namespace.Name).Delete(name, nil); err != nil {
return err
}