1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 03:38:43 +00:00

Merge pull request from brancz/more-e2e-tests

Versioned monitoring api client
This commit is contained in:
Frederic Branczyk 2017-01-09 09:19:46 +01:00 committed by GitHub
commit 924ef59119
23 changed files with 1452 additions and 433 deletions

View file

@ -2,6 +2,7 @@ all: build
REPO?=quay.io/coreos/prometheus-operator
TAG?=$(shell git rev-parse --short HEAD)
NAMESPACE?=prometheus-operator-e2e-tests-$(shell head /dev/urandom | tr -dc a-z0-9 | head -c 13 ; echo '')
build:
./scripts/check_license.sh
@ -11,8 +12,12 @@ container:
GOOS=linux $(MAKE) build
docker build -t $(REPO):$(TAG) .
e2e: container
go test -v ./test/e2e/ --kubeconfig "$(HOME)/.kube/config" --operator-image=quay.io/coreos/prometheus-operator:$(TAG) --namespace=prometheus-operator-e2e-tests-$(TAG)
e2e-test:
go test -v ./test/e2e/ --kubeconfig "$(HOME)/.kube/config" --operator-image=quay.io/coreos/prometheus-operator:$(TAG) --namespace=$(NAMESPACE)
e2e:
$(MAKE) container
$(MAKE) e2e-test
clean-e2e:
kubectl delete namespace prometheus-operator-e2e-tests

View file

@ -1,86 +0,0 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alertmanager
import (
"encoding/json"
"time"
"github.com/coreos/prometheus-operator/pkg/spec"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const resyncPeriod = 5 * time.Minute
type alertmanagerDecoder struct {
dec *json.Decoder
close func() error
}
func (d *alertmanagerDecoder) Close() {
d.close()
}
func (d *alertmanagerDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Alertmanager
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewAlertmanagerListWatch returns a new ListWatch on the Alertmanager resource.
func NewAlertmanagerListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(api.NamespaceAll).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var p spec.AlertmanagerList
return &p, json.Unmarshal(b, &p)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(api.NamespaceAll).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&alertmanagerDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}

View file

@ -20,10 +20,10 @@ import (
"time"
"github.com/coreos/prometheus-operator/pkg/analytics"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
@ -36,24 +36,20 @@ import (
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
utilruntime "k8s.io/client-go/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const (
TPRGroup = "monitoring.coreos.com"
TPRVersion = "v1alpha1"
tprAlertmanager = "alertmanager." + v1alpha1.TPRGroup
TPRAlertmanagersKind = "alertmanagers"
tprAlertmanager = "alertmanager." + TPRGroup
resyncPeriod = 5 * time.Minute
)
// Operator manages lify cycle of Alertmanager deployments and
// monitoring configurations.
type Operator struct {
kclient *kubernetes.Clientset
pclient *rest.RESTClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
alrtInf cache.SharedIndexInformer
@ -75,14 +71,14 @@ func New(c prometheus.Config, logger log.Logger) (*Operator, error) {
return nil, err
}
promclient, err := prometheus.NewPrometheusRESTClient(*cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &Operator{
kclient: client,
pclient: promclient,
mclient: mclient,
logger: logger,
queue: queue.New(),
host: cfg.Host,
@ -121,8 +117,11 @@ func (c *Operator) Run(stopc <-chan struct{}) error {
}
c.alrtInf = cache.NewSharedIndexInformer(
NewAlertmanagerListWatch(c.pclient),
&spec.Alertmanager{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: c.mclient.Alertmanagers(api.NamespaceAll).List,
WatchFunc: c.mclient.Alertmanagers(api.NamespaceAll).Watch,
},
&v1alpha1.Alertmanager{}, resyncPeriod, cache.Indexers{},
)
c.ssetInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Apps().RESTClient(), "statefulsets", api.NamespaceAll, nil),
@ -191,7 +190,7 @@ func (c *Operator) enqueue(obj interface{}) {
// enqueueForNamespace enqueues all Alertmanager object keys that belong to the given namespace.
func (c *Operator) enqueueForNamespace(ns string) {
cache.ListAll(c.alrtInf.GetStore(), labels.Everything(), func(obj interface{}) {
am := obj.(*spec.Alertmanager)
am := obj.(*v1alpha1.Alertmanager)
if am.Namespace == ns {
c.enqueue(am)
}
@ -223,7 +222,7 @@ func (c *Operator) worker() {
}
}
func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *spec.Alertmanager {
func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *v1alpha1.Alertmanager {
key, ok := c.keyFunc(ps)
if !ok {
return nil
@ -237,7 +236,7 @@ func (c *Operator) alertmanagerForStatefulSet(ps interface{}) *spec.Alertmanager
if !exists {
return nil
}
return a.(*spec.Alertmanager)
return a.(*v1alpha1.Alertmanager)
}
func (c *Operator) handleAlertmanagerAdd(obj interface{}) {
@ -319,7 +318,7 @@ func (c *Operator) sync(key string) error {
return c.destroyAlertmanager(key)
}
am := obj.(*spec.Alertmanager)
am := obj.(*v1alpha1.Alertmanager)
c.logger.Log("msg", "sync alertmanager", "key", key)
@ -349,7 +348,7 @@ func (c *Operator) sync(key string) error {
return c.syncVersion(am)
}
func listOptions(name string) v1.ListOptions {
func ListOptions(name string) v1.ListOptions {
return v1.ListOptions{
LabelSelector: fields.SelectorFromSet(fields.Set(map[string]string{
"app": "alertmanager",
@ -363,10 +362,10 @@ func listOptions(name string) v1.ListOptions {
// create new pods.
//
// TODO(fabxc): remove this once the StatefulSet controller learns how to do rolling updates.
func (c *Operator) syncVersion(am *spec.Alertmanager) error {
func (c *Operator) syncVersion(am *v1alpha1.Alertmanager) error {
podClient := c.kclient.Core().Pods(am.Namespace)
pods, err := podClient.List(listOptions(am.Name))
pods, err := podClient.List(ListOptions(am.Name))
if err != nil {
return err
}
@ -439,7 +438,7 @@ func (c *Operator) destroyAlertmanager(key string) error {
// TODO(fabxc): temporary solution until StatefulSet status provides necessary info to know
// whether scale-down completed.
for {
pods, err := podClient.List(listOptions(sset.Name))
pods, err := podClient.List(ListOptions(sset.Name))
if err != nil {
return err
}
@ -464,7 +463,7 @@ func (c *Operator) createTPRs() error {
Name: tprAlertmanager,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Managed Alertmanager cluster",
},
@ -479,5 +478,5 @@ func (c *Operator) createTPRs() error {
}
// We have to wait for the TPRs to be ready. Otherwise the initial watch may fail.
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRAlertmanagersKind)
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRAlertmanagerName)
}

View file

@ -24,10 +24,10 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
func makeStatefulSet(am *spec.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
func makeStatefulSet(am *v1alpha1.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -80,7 +80,7 @@ func makeStatefulSet(am *spec.Alertmanager, old *v1beta1.StatefulSet) *v1beta1.S
return statefulset
}
func makeStatefulSetService(p *spec.Alertmanager) *v1.Service {
func makeStatefulSetService(p *v1alpha1.Alertmanager) *v1.Service {
svc := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "alertmanager",
@ -109,7 +109,7 @@ func makeStatefulSetService(p *spec.Alertmanager) *v1.Service {
return svc
}
func makeStatefulSetSpec(a *spec.Alertmanager) v1beta1.StatefulSetSpec {
func makeStatefulSetSpec(a *v1alpha1.Alertmanager) v1beta1.StatefulSetSpec {
image := fmt.Sprintf("%s:%s", a.Spec.BaseImage, a.Spec.Version)
commands := []string{
@ -219,7 +219,7 @@ func makeStatefulSetSpec(a *spec.Alertmanager) v1beta1.StatefulSetSpec {
}
}
func subPathForStorage(s *spec.StorageSpec) string {
func subPathForStorage(s *v1alpha1.StorageSpec) string {
if s == nil {
return ""
}

View file

@ -22,13 +22,14 @@ import (
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
)
type API struct {
kclient *kubernetes.Clientset
pclient *prometheus.MonitoringClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
}
@ -43,14 +44,14 @@ func New(conf prometheus.Config, l log.Logger) (*API, error) {
return nil, err
}
pclient, err := prometheus.NewForConfig(cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &API{
kclient: kclient,
pclient: pclient,
mclient: mclient,
logger: l,
}, nil
}
@ -94,7 +95,7 @@ func parsePrometheusStatusUrl(path string) objectReference {
func (api *API) prometheusStatus(w http.ResponseWriter, req *http.Request) {
or := parsePrometheusStatusUrl(req.URL.Path)
p, err := api.pclient.Prometheuses(or.namespace).Get(or.name)
p, err := api.mclient.Prometheuses(or.namespace).Get(or.name)
if err != nil {
if k8sutil.IsResourceNotFoundError(err) {
w.WriteHeader(404)

View file

@ -0,0 +1,189 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha1
import (
"encoding/json"
"fmt"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
)
const (
TPRAlertmanagersKind = "Alertmanager"
TPRAlertmanagerName = "alertmanagers"
)
type AlertmanagersGetter interface {
Alertmanagers(namespace string) AlertmanagerInterface
}
type AlertmanagerInterface interface {
Create(*Alertmanager) (*Alertmanager, error)
Get(name string) (*Alertmanager, error)
Update(*Alertmanager) (*Alertmanager, error)
Delete(name string, options *v1.DeleteOptions) error
List(opts v1.ListOptions) (runtime.Object, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
}
type alertmanagers struct {
restClient rest.Interface
client *dynamic.ResourceClient
ns string
}
func newAlertmanagers(r rest.Interface, c *dynamic.Client, namespace string) *alertmanagers {
return &alertmanagers{
r,
c.Resource(
&metav1.APIResource{
Kind: TPRAlertmanagersKind,
Name: TPRAlertmanagerName,
Namespaced: true,
},
namespace,
),
namespace,
}
}
func (a *alertmanagers) Create(o *Alertmanager) (*Alertmanager, error) {
ua, err := UnstructuredFromAlertmanager(o)
if err != nil {
return nil, fmt.Errorf("unstructuring alertmanager failed: %s", err)
}
ua, err = a.client.Create(ua)
if err != nil {
return nil, fmt.Errorf("creating alertmanager failed: %s", err)
}
return AlertmanagerFromUnstructured(ua)
}
func (a *alertmanagers) Get(name string) (*Alertmanager, error) {
obj, err := a.client.Get(name)
if err != nil {
return nil, fmt.Errorf("retrieving alertmanager failed: %s", err)
}
return AlertmanagerFromUnstructured(obj)
}
func (a *alertmanagers) Update(o *Alertmanager) (*Alertmanager, error) {
ua, err := UnstructuredFromAlertmanager(o)
if err != nil {
return nil, fmt.Errorf("unstructuring alertmanager failed: %s", err)
}
ua, err = a.client.Update(ua)
if err != nil {
return nil, fmt.Errorf("updating alertmanager failed: %s", err)
}
return AlertmanagerFromUnstructured(ua)
}
func (a *alertmanagers) Delete(name string, options *v1.DeleteOptions) error {
return a.client.Delete(name, options)
}
func (a *alertmanagers) List(opts v1.ListOptions) (runtime.Object, error) {
req := a.restClient.Get().
Namespace(a.ns).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var p AlertmanagerList
return &p, json.Unmarshal(b, &p)
}
func (a *alertmanagers) Watch(opts v1.ListOptions) (watch.Interface, error) {
r, err := a.restClient.Get().
Prefix("watch").
Namespace(a.ns).
Resource("alertmanagers").
// VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&alertmanagerDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}
// AlertmanagerFromUnstructured unmarshals an Alertmanager object from dynamic client's unstructured
func AlertmanagerFromUnstructured(r *unstructured.Unstructured) (*Alertmanager, error) {
b, err := json.Marshal(r.Object)
if err != nil {
return nil, err
}
var a Alertmanager
if err := json.Unmarshal(b, &a); err != nil {
return nil, err
}
a.TypeMeta.Kind = TPRAlertmanagersKind
a.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
return &a, nil
}
// UnstructuredFromAlertmanager marshals an Alertmanager object into dynamic client's unstructured
func UnstructuredFromAlertmanager(a *Alertmanager) (*unstructured.Unstructured, error) {
a.TypeMeta.Kind = TPRAlertmanagersKind
a.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
b, err := json.Marshal(a)
if err != nil {
return nil, err
}
var r unstructured.Unstructured
if err := json.Unmarshal(b, &r.Object); err != nil {
return nil, err
}
return &r, nil
}
type alertmanagerDecoder struct {
dec *json.Decoder
close func() error
}
func (d *alertmanagerDecoder) Close() {
d.close()
}
func (d *alertmanagerDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object Alertmanager
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}

View file

@ -0,0 +1,82 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha1
import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/rest"
)
const (
TPRGroup = "monitoring.coreos.com"
TPRVersion = "v1alpha1"
)
type MonitoringV1alpha1Interface interface {
RESTClient() rest.Interface
PrometheusesGetter
AlertmanagersGetter
ServiceMonitorsGetter
}
type MonitoringV1alpha1Client struct {
restClient rest.Interface
dynamicClient *dynamic.Client
}
func (c *MonitoringV1alpha1Client) Prometheuses(namespace string) PrometheusInterface {
return newPrometheuses(c.restClient, c.dynamicClient, namespace)
}
func (c *MonitoringV1alpha1Client) Alertmanagers(namespace string) AlertmanagerInterface {
return newAlertmanagers(c.restClient, c.dynamicClient, namespace)
}
func (c *MonitoringV1alpha1Client) ServiceMonitors(namespace string) ServiceMonitorInterface {
return newServiceMonitors(c.restClient, c.dynamicClient, namespace)
}
func (c *MonitoringV1alpha1Client) RESTClient() rest.Interface {
return c.restClient
}
func NewForConfig(c *rest.Config) (*MonitoringV1alpha1Client, error) {
config := *c
setConfigDefaults(&config)
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewClient(&config)
if err != nil {
return nil, err
}
return &MonitoringV1alpha1Client{client, dynamicClient}, nil
}
func setConfigDefaults(config *rest.Config) {
config.GroupVersion = &schema.GroupVersion{
Group: TPRGroup,
Version: TPRVersion,
}
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return
}

View file

@ -0,0 +1,188 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha1
import (
"encoding/json"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
)
const (
TPRPrometheusesKind = "Prometheus"
TPRPrometheusName = "prometheuses"
)
type PrometheusesGetter interface {
Prometheuses(namespace string) PrometheusInterface
}
type PrometheusInterface interface {
Create(*Prometheus) (*Prometheus, error)
Get(name string) (*Prometheus, error)
Update(*Prometheus) (*Prometheus, error)
Delete(name string, options *v1.DeleteOptions) error
List(opts v1.ListOptions) (runtime.Object, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
}
type prometheuses struct {
restClient rest.Interface
client *dynamic.ResourceClient
ns string
}
func newPrometheuses(r rest.Interface, c *dynamic.Client, namespace string) *prometheuses {
return &prometheuses{
r,
c.Resource(
&metav1.APIResource{
Kind: TPRPrometheusesKind,
Name: TPRPrometheusName,
Namespaced: true,
},
namespace,
),
namespace,
}
}
func (p *prometheuses) Create(o *Prometheus) (*Prometheus, error) {
up, err := UnstructuredFromPrometheus(o)
if err != nil {
return nil, err
}
up, err = p.client.Create(up)
if err != nil {
return nil, err
}
return PrometheusFromUnstructured(up)
}
func (p *prometheuses) Get(name string) (*Prometheus, error) {
obj, err := p.client.Get(name)
if err != nil {
return nil, err
}
return PrometheusFromUnstructured(obj)
}
func (p *prometheuses) Update(o *Prometheus) (*Prometheus, error) {
up, err := UnstructuredFromPrometheus(o)
if err != nil {
return nil, err
}
up, err = p.client.Update(up)
if err != nil {
return nil, err
}
return PrometheusFromUnstructured(up)
}
func (p *prometheuses) Delete(name string, options *v1.DeleteOptions) error {
return p.client.Delete(name, options)
}
func (p *prometheuses) List(opts v1.ListOptions) (runtime.Object, error) {
req := p.restClient.Get().
Namespace(p.ns).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var prom PrometheusList
return &prom, json.Unmarshal(b, &prom)
}
func (p *prometheuses) Watch(opts v1.ListOptions) (watch.Interface, error) {
r, err := p.restClient.Get().
Prefix("watch").
Namespace(p.ns).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&prometheusDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}
// PrometheusFromUnstructured unmarshals a Prometheus object from dynamic client's unstructured
func PrometheusFromUnstructured(r *unstructured.Unstructured) (*Prometheus, error) {
b, err := json.Marshal(r.Object)
if err != nil {
return nil, err
}
var p Prometheus
if err := json.Unmarshal(b, &p); err != nil {
return nil, err
}
p.TypeMeta.Kind = TPRPrometheusesKind
p.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
return &p, nil
}
// UnstructuredFromPrometheus marshals a Prometheus object into dynamic client's unstructured
func UnstructuredFromPrometheus(p *Prometheus) (*unstructured.Unstructured, error) {
p.TypeMeta.Kind = TPRPrometheusesKind
p.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
b, err := json.Marshal(p)
if err != nil {
return nil, err
}
var r unstructured.Unstructured
if err := json.Unmarshal(b, &r.Object); err != nil {
return nil, err
}
return &r, nil
}
type prometheusDecoder struct {
dec *json.Decoder
close func() error
}
func (d *prometheusDecoder) Close() {
d.close()
}
func (d *prometheusDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object Prometheus
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}

View file

@ -0,0 +1,188 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1alpha1
import (
"encoding/json"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
)
const (
TPRServiceMonitorsKind = "ServiceMonitor"
TPRServiceMonitorName = "servicemonitors"
)
type ServiceMonitorsGetter interface {
ServiceMonitors(namespace string) *dynamic.ResourceClient
}
type ServiceMonitorInterface interface {
Create(*ServiceMonitor) (*ServiceMonitor, error)
Get(name string) (*ServiceMonitor, error)
Update(*ServiceMonitor) (*ServiceMonitor, error)
Delete(name string, options *v1.DeleteOptions) error
List(opts v1.ListOptions) (runtime.Object, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
}
type servicemonitors struct {
restClient rest.Interface
client *dynamic.ResourceClient
ns string
}
func newServiceMonitors(r rest.Interface, c *dynamic.Client, namespace string) *servicemonitors {
return &servicemonitors{
r,
c.Resource(
&metav1.APIResource{
Kind: TPRServiceMonitorsKind,
Name: TPRServiceMonitorName,
Namespaced: true,
},
namespace,
),
namespace,
}
}
func (s *servicemonitors) Create(o *ServiceMonitor) (*ServiceMonitor, error) {
us, err := UnstructuredFromServiceMonitor(o)
if err != nil {
return nil, err
}
us, err = s.client.Create(us)
if err != nil {
return nil, err
}
return ServiceMonitorFromUnstructured(us)
}
func (s *servicemonitors) Get(name string) (*ServiceMonitor, error) {
obj, err := s.client.Get(name)
if err != nil {
return nil, err
}
return ServiceMonitorFromUnstructured(obj)
}
func (s *servicemonitors) Update(o *ServiceMonitor) (*ServiceMonitor, error) {
us, err := UnstructuredFromServiceMonitor(o)
if err != nil {
return nil, err
}
us, err = s.client.Update(us)
if err != nil {
return nil, err
}
return ServiceMonitorFromUnstructured(us)
}
func (s *servicemonitors) Delete(name string, options *v1.DeleteOptions) error {
return s.client.Delete(name, options)
}
func (s *servicemonitors) List(opts v1.ListOptions) (runtime.Object, error) {
req := s.restClient.Get().
Namespace(s.ns).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var sm ServiceMonitorList
return &sm, json.Unmarshal(b, &sm)
}
func (s *servicemonitors) Watch(opts v1.ListOptions) (watch.Interface, error) {
r, err := s.restClient.Get().
Prefix("watch").
Namespace(s.ns).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&serviceMonitorDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}
// ServiceMonitorFromUnstructured unmarshals a ServiceMonitor object from dynamic client's unstructured
func ServiceMonitorFromUnstructured(r *unstructured.Unstructured) (*ServiceMonitor, error) {
b, err := json.Marshal(r.Object)
if err != nil {
return nil, err
}
var s ServiceMonitor
if err := json.Unmarshal(b, &s); err != nil {
return nil, err
}
s.TypeMeta.Kind = TPRServiceMonitorsKind
s.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
return &s, nil
}
// UnstructuredFromServiceMonitor marshals a ServiceMonitor object into dynamic client's unstructured
func UnstructuredFromServiceMonitor(s *ServiceMonitor) (*unstructured.Unstructured, error) {
s.TypeMeta.Kind = TPRServiceMonitorsKind
s.TypeMeta.APIVersion = TPRGroup + "/" + TPRVersion
b, err := json.Marshal(s)
if err != nil {
return nil, err
}
var r unstructured.Unstructured
if err := json.Unmarshal(b, &r.Object); err != nil {
return nil, err
}
return &r, nil
}
type serviceMonitorDecoder struct {
dec *json.Decoder
close func() error
}
func (d *serviceMonitorDecoder) Close() {
d.close()
}
func (d *serviceMonitorDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object ServiceMonitor
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package spec
package v1alpha1
import (
"k8s.io/client-go/pkg/api/v1"
@ -170,3 +170,6 @@ type Selector struct {
// Currently the selector is only used for namespaces which require more complex
// implementation to support label selections.
}
type ListOptions v1.ListOptions
type DeleteOptions v1.DeleteOptions

View file

@ -1,204 +0,0 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"encoding/json"
"time"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"github.com/coreos/prometheus-operator/pkg/spec"
)
const resyncPeriod = 5 * time.Minute
type MonitoringClient struct {
*rest.RESTClient
}
func (c *MonitoringClient) Prometheuses(namespace string) PrometheusInterface {
return newPrometheuses(c, namespace)
}
func NewForConfig(c *rest.Config) (*MonitoringClient, error) {
client, err := NewPrometheusRESTClient(*c)
if err != nil {
return nil, err
}
return &MonitoringClient{client}, nil
}
func NewPrometheusRESTClient(c rest.Config) (*rest.RESTClient, error) {
c.APIPath = "/apis"
c.GroupVersion = &schema.GroupVersion{
Group: "monitoring.coreos.com",
Version: "v1alpha1",
}
// TODO(fabxc): is this even used with our custom list/watch functions?
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return rest.RESTClientFor(&c)
}
type PrometheusInterface interface {
Get(name string) (*spec.Prometheus, error)
}
type prometheuses struct {
client *MonitoringClient
ns string
}
func newPrometheuses(c *MonitoringClient, namespace string) *prometheuses {
return &prometheuses{
client: c,
ns: namespace,
}
}
func (p *prometheuses) Get(name string) (result *spec.Prometheus, err error) {
result = &spec.Prometheus{}
req := p.client.Get().
Namespace(p.ns).
Resource("prometheuses").
Name(name)
body, err := req.DoRaw()
if err != nil {
return nil, err
}
err = json.Unmarshal(body, &result)
return
}
type prometheusDecoder struct {
dec *json.Decoder
close func() error
}
func (d *prometheusDecoder) Close() {
d.close()
}
func (d *prometheusDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Prometheus
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewPrometheusListWatch returns a new ListWatch on the Prometheus resource.
func NewPrometheusListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(v1.NamespaceAll).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var p spec.PrometheusList
return &p, json.Unmarshal(b, &p)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(v1.NamespaceAll).
Resource("prometheuses").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&prometheusDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}
type serviceMonitorDecoder struct {
dec *json.Decoder
close func() error
}
func (d *serviceMonitorDecoder) Close() {
d.close()
}
func (d *serviceMonitorDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.ServiceMonitor
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
// NewServiceMonitorListWatch returns a new ListWatch on the ServiceMonitor resource.
func NewServiceMonitorListWatch(client *rest.RESTClient) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: cache.ListFunc(func(options v1.ListOptions) (runtime.Object, error) {
req := client.Get().
Namespace(v1.NamespaceAll).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec)
FieldsSelectorParam(nil)
b, err := req.DoRaw()
if err != nil {
return nil, err
}
var sm spec.ServiceMonitorList
return &sm, json.Unmarshal(b, &sm)
}),
WatchFunc: cache.WatchFunc(func(options v1.ListOptions) (watch.Interface, error) {
r, err := client.Get().
Prefix("watch").
Namespace(v1.NamespaceAll).
Resource("servicemonitors").
// VersionedParams(&options, v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&serviceMonitorDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
}),
}
}

View file

@ -20,9 +20,9 @@ import (
"time"
"github.com/coreos/prometheus-operator/pkg/analytics"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/go-kit/kit/log"
"k8s.io/client-go/kubernetes"
@ -41,21 +41,17 @@ import (
)
const (
TPRGroup = "monitoring.coreos.com"
TPRVersion = "v1alpha1"
tprServiceMonitor = "service-monitor." + v1alpha1.TPRGroup
tprPrometheus = "prometheus." + v1alpha1.TPRGroup
TPRPrometheusesKind = "prometheuses"
TPRServiceMonitorsKind = "servicemonitors"
tprServiceMonitor = "service-monitor." + TPRGroup
tprPrometheus = "prometheus." + TPRGroup
resyncPeriod = 5 * time.Minute
)
// Operator manages lify cycle of Prometheus deployments and
// monitoring configurations.
type Operator struct {
kclient *kubernetes.Clientset
pclient *rest.RESTClient
mclient *v1alpha1.MonitoringV1alpha1Client
logger log.Logger
promInf cache.SharedIndexInformer
@ -86,25 +82,31 @@ func New(conf Config, logger log.Logger) (*Operator, error) {
return nil, err
}
promclient, err := NewPrometheusRESTClient(*cfg)
mclient, err := v1alpha1.NewForConfig(cfg)
if err != nil {
return nil, err
}
c := &Operator{
kclient: client,
pclient: promclient,
mclient: mclient,
logger: logger,
queue: queue.New(),
host: cfg.Host,
}
c.promInf = cache.NewSharedIndexInformer(
NewPrometheusListWatch(c.pclient),
&spec.Prometheus{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: mclient.Prometheuses(api.NamespaceAll).List,
WatchFunc: mclient.Prometheuses(api.NamespaceAll).Watch,
},
&v1alpha1.Prometheus{}, resyncPeriod, cache.Indexers{},
)
c.smonInf = cache.NewSharedIndexInformer(
NewServiceMonitorListWatch(c.pclient),
&spec.ServiceMonitor{}, resyncPeriod, cache.Indexers{},
&cache.ListWatch{
ListFunc: mclient.ServiceMonitors(api.NamespaceAll).List,
WatchFunc: mclient.ServiceMonitors(api.NamespaceAll).Watch,
},
&v1alpha1.ServiceMonitor{}, resyncPeriod, cache.Indexers{},
)
c.cmapInf = cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(c.kclient.Core().RESTClient(), "configmaps", api.NamespaceAll, nil),
@ -296,7 +298,7 @@ func (c *Operator) enqueue(obj interface{}) {
// enqueueForNamespace enqueues all Prometheus object keys that belong to the given namespace.
func (c *Operator) enqueueForNamespace(ns string) {
cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
p := obj.(*spec.Prometheus)
p := obj.(*v1alpha1.Prometheus)
if p.Namespace == ns {
c.enqueue(p)
}
@ -328,7 +330,7 @@ func (c *Operator) worker() {
}
}
func (c *Operator) prometheusForStatefulSet(ps interface{}) *spec.Prometheus {
func (c *Operator) prometheusForStatefulSet(ps interface{}) *v1alpha1.Prometheus {
key, ok := c.keyFunc(ps)
if !ok {
return nil
@ -342,7 +344,7 @@ func (c *Operator) prometheusForStatefulSet(ps interface{}) *spec.Prometheus {
if !exists {
return nil
}
return p.(*spec.Prometheus)
return p.(*v1alpha1.Prometheus)
}
func (c *Operator) handleDeleteStatefulSet(obj interface{}) {
@ -391,7 +393,7 @@ func (c *Operator) sync(key string) error {
return c.destroyPrometheus(key)
}
p := obj.(*spec.Prometheus)
p := obj.(*v1alpha1.Prometheus)
if p.Spec.Paused {
return nil
}
@ -456,7 +458,7 @@ func ListOptions(name string) v1.ListOptions {
// create new pods.
//
// TODO(fabxc): remove this once the StatefulSet controller learns how to do rolling updates.
func (c *Operator) syncVersion(key string, p *spec.Prometheus) error {
func (c *Operator) syncVersion(key string, p *v1alpha1.Prometheus) error {
status, oldPods, err := PrometheusStatus(c.kclient, p)
if err != nil {
return err
@ -488,8 +490,8 @@ func (c *Operator) syncVersion(key string, p *spec.Prometheus) error {
return nil
}
func PrometheusStatus(kclient *kubernetes.Clientset, p *spec.Prometheus) (*spec.PrometheusStatus, []*v1.Pod, error) {
res := &spec.PrometheusStatus{}
func PrometheusStatus(kclient *kubernetes.Clientset, p *v1alpha1.Prometheus) (*v1alpha1.PrometheusStatus, []*v1.Pod, error) {
res := &v1alpha1.PrometheusStatus{}
pods, err := kclient.Core().Pods(p.Namespace).List(ListOptions(p.Name))
if err != nil {
@ -572,7 +574,7 @@ func (c *Operator) destroyPrometheus(key string) error {
return nil
}
func (c *Operator) createConfig(p *spec.Prometheus) error {
func (c *Operator) createConfig(p *v1alpha1.Prometheus) error {
smons, err := c.selectServiceMonitors(p)
if err != nil {
return err
@ -603,9 +605,9 @@ func (c *Operator) createConfig(p *spec.Prometheus) error {
return err
}
func (c *Operator) selectServiceMonitors(p *spec.Prometheus) (map[string]*spec.ServiceMonitor, error) {
func (c *Operator) selectServiceMonitors(p *v1alpha1.Prometheus) (map[string]*v1alpha1.ServiceMonitor, error) {
// Selectors might overlap. Deduplicate them along the keyFunc.
res := make(map[string]*spec.ServiceMonitor)
res := make(map[string]*v1alpha1.ServiceMonitor)
selector, err := metav1.LabelSelectorAsSelector(p.Spec.ServiceMonitorSelector)
if err != nil {
@ -617,7 +619,7 @@ func (c *Operator) selectServiceMonitors(p *spec.Prometheus) (map[string]*spec.S
cache.ListAllByNamespace(c.smonInf.GetIndexer(), p.Namespace, selector, func(obj interface{}) {
k, ok := c.keyFunc(obj)
if ok {
res[k] = obj.(*spec.ServiceMonitor)
res[k] = obj.(*v1alpha1.ServiceMonitor)
}
})
@ -631,7 +633,7 @@ func (c *Operator) createTPRs() error {
Name: tprServiceMonitor,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Prometheus monitoring for a service",
},
@ -640,7 +642,7 @@ func (c *Operator) createTPRs() error {
Name: tprPrometheus,
},
Versions: []extensionsobj.APIVersion{
{Name: TPRVersion},
{Name: v1alpha1.TPRVersion},
},
Description: "Managed Prometheus server",
},
@ -655,9 +657,9 @@ func (c *Operator) createTPRs() error {
}
// We have to wait for the TPRs to be ready. Otherwise the initial watch may fail.
err := k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRPrometheusesKind)
err := k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRPrometheusName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), TPRGroup, TPRVersion, TPRServiceMonitorsKind)
return k8sutil.WaitForTPRReady(c.kclient.CoreV1().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRServiceMonitorName)
}

View file

@ -22,7 +22,7 @@ import (
yaml "gopkg.in/yaml.v2"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
var (
@ -33,7 +33,7 @@ func sanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}
func generateConfig(p *spec.Prometheus, mons map[string]*spec.ServiceMonitor) ([]byte, error) {
func generateConfig(p *v1alpha1.Prometheus, mons map[string]*v1alpha1.ServiceMonitor) ([]byte, error) {
cfg := map[string]interface{}{}
cfg["global"] = map[string]string{
@ -62,7 +62,7 @@ func generateConfig(p *spec.Prometheus, mons map[string]*spec.ServiceMonitor) ([
return yaml.Marshal(cfg)
}
func generateServiceMonitorConfig(m *spec.ServiceMonitor, ep spec.Endpoint, i int) interface{} {
func generateServiceMonitorConfig(m *v1alpha1.ServiceMonitor, ep v1alpha1.Endpoint, i int) interface{} {
cfg := map[string]interface{}{
"job_name": fmt.Sprintf("%s/%s/%d", m.Namespace, m.Name, i),
"kubernetes_sd_configs": []map[string]interface{}{
@ -238,7 +238,7 @@ func generateServiceMonitorConfig(m *spec.ServiceMonitor, ep spec.Endpoint, i in
return cfg
}
func generateAlertmanagerConfig(am spec.AlertmanagerEndpoints) interface{} {
func generateAlertmanagerConfig(am v1alpha1.AlertmanagerEndpoints) interface{} {
if am.Scheme == "" {
am.Scheme = "http"
}

View file

@ -24,10 +24,10 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
"github.com/coreos/prometheus-operator/pkg/spec"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
func makeStatefulSet(p spec.Prometheus, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
func makeStatefulSet(p v1alpha1.Prometheus, old *v1beta1.StatefulSet) *v1beta1.StatefulSet {
// TODO(fabxc): is this the right point to inject defaults?
// Ideally we would do it before storing but that's currently not possible.
// Potentially an update handler on first insertion.
@ -109,7 +109,7 @@ func makeEmptyRules(name string) *v1.ConfigMap {
}
}
func makeStatefulSetService(p *spec.Prometheus) *v1.Service {
func makeStatefulSetService(p *v1alpha1.Prometheus) *v1.Service {
svc := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "prometheus",
@ -131,7 +131,7 @@ func makeStatefulSetService(p *spec.Prometheus) *v1.Service {
return svc
}
func makeStatefulSetSpec(p spec.Prometheus) v1beta1.StatefulSetSpec {
func makeStatefulSetSpec(p v1alpha1.Prometheus) v1beta1.StatefulSetSpec {
// Prometheus may take quite long to shut down to checkpoint existing data.
// Allow up to 10 minutes for clean termination.
terminationGracePeriod := int64(600)
@ -291,7 +291,7 @@ func makeStatefulSetSpec(p spec.Prometheus) v1beta1.StatefulSetSpec {
}
}
func subPathForStorage(s *spec.StorageSpec) string {
func subPathForStorage(s *v1alpha1.StorageSpec) string {
if s == nil {
return ""
}

View file

@ -16,36 +16,18 @@ package e2e
import (
"testing"
"time"
"k8s.io/client-go/pkg/api/v1"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/spec"
)
func TestCreateCluster(t *testing.T) {
spec := &spec.Prometheus{
ObjectMeta: v1.ObjectMeta{
Name: "prometheus-test",
},
Spec: spec.PrometheusSpec{
Replicas: 1,
},
}
_, err := framework.CreatePrometheus(spec)
if err != nil {
t.Fatal(err)
}
func TestAlertmanagerCreateDeleteCluster(t *testing.T) {
name := "alertmanager-test"
defer func() {
if err := framework.DeletePrometheus("prometheus-test"); err != nil {
if err := framework.DeleteAlertmanagerAndWaitUntilGone(name); err != nil {
t.Fatal(err)
}
}()
if _, err := framework.WaitForPodsReady(time.Minute*2, 1, prometheus.ListOptions("prometheus-test")); err != nil {
t.Fatalf("failed to create 1 Prometheus instances: %v", err)
if err := framework.CreateAlertmanagerAndWaitUntilReady(framework.MakeBasicAlertmanager(name, 3)); err != nil {
t.Fatal(err)
}
}

View file

@ -0,0 +1,89 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package framework
import (
"fmt"
"time"
"k8s.io/client-go/pkg/api/v1"
"github.com/coreos/prometheus-operator/pkg/alertmanager"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
)
var ValidAlertmanagerConfig = `global:
resolve_timeout: 5m
route:
group_by: ['job']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receiver: 'webhook'
receivers:
- name: 'webhook'
webhook_configs:
- url: 'http://alertmanagerwh:30500/'
`
func (f *Framework) MakeBasicAlertmanager(name string, replicas int32) *v1alpha1.Alertmanager {
return &v1alpha1.Alertmanager{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.AlertmanagerSpec{
Replicas: replicas,
},
}
}
func (f *Framework) CreateAlertmanagerAndWaitUntilReady(a *v1alpha1.Alertmanager) error {
_, err := f.KubeClient.CoreV1().ConfigMaps(f.Namespace.Name).Create(
&v1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: a.Name,
},
Data: map[string]string{
"alertmanager.yaml": ValidAlertmanagerConfig,
},
},
)
if err != nil {
return err
}
_, err = f.MonClient.Alertmanagers(f.Namespace.Name).Create(a)
if err != nil {
return err
}
_, err = f.WaitForPodsReady(time.Minute*2, int(a.Spec.Replicas), alertmanager.ListOptions(a.Name))
if err != nil {
return fmt.Errorf("failed to create an Alertmanager cluster (%s) with %d instances: %v", a.Name, a.Spec.Replicas, err)
}
return nil
}
func (f *Framework) DeleteAlertmanagerAndWaitUntilGone(name string) error {
if err := f.MonClient.Alertmanagers(f.Namespace.Name).Delete(name, nil); err != nil {
return err
}
if _, err := f.WaitForPodsReady(time.Minute*2, 0, alertmanager.ListOptions(name)); err != nil {
return fmt.Errorf("failed to teardown Alertmanager (%s) instances: %v", name, err)
}
return f.KubeClient.CoreV1().ConfigMaps(f.Namespace.Name).Delete(name, nil)
}

View file

@ -15,8 +15,6 @@
package framework
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
@ -32,13 +30,13 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/spec"
)
type Framework struct {
KubeClient kubernetes.Interface
MonClient *v1alpha1.MonitoringV1alpha1Client
HTTPClient *http.Client
MasterHost string
Namespace *v1.Namespace
@ -62,6 +60,11 @@ func New(ns, kubeconfig, opImage string) (*Framework, error) {
return nil, err
}
mclient, err := v1alpha1.NewForConfig(config)
if err != nil {
return nil, err
}
namespace, err := cli.Core().Namespaces().Create(&v1.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: ns,
@ -74,6 +77,7 @@ func New(ns, kubeconfig, opImage string) (*Framework, error) {
f := &Framework{
MasterHost: config.Host,
KubeClient: cli,
MonClient: mclient,
HTTPClient: httpc,
Namespace: namespace,
}
@ -126,12 +130,17 @@ func (f *Framework) setupPrometheusOperator(opImage string) error {
}
f.OperatorPod = &pl.Items[0]
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), prometheus.TPRGroup, prometheus.TPRVersion, prometheus.TPRPrometheusesKind)
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRPrometheusName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), prometheus.TPRGroup, prometheus.TPRVersion, prometheus.TPRServiceMonitorsKind)
err = k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRServiceMonitorName)
if err != nil {
return err
}
return k8sutil.WaitForTPRReady(f.KubeClient.Core().RESTClient(), v1alpha1.TPRGroup, v1alpha1.TPRVersion, v1alpha1.TPRAlertmanagerName)
}
// Teardown tears down a previously initialized test environment.
@ -165,7 +174,7 @@ func waitForPodsReady(client v1client.CoreV1Interface, timeout time.Duration, ex
}
runningAndReady := 0
if len(pl.Items) > 0 {
if len(pl.Items) >= 0 {
for _, p := range pl.Items {
isRunningAndReady, err := k8sutil.PodRunningAndReady(p)
if err != nil {
@ -191,46 +200,6 @@ func (f *Framework) CreateDeployment(kclient kubernetes.Interface, ns string, de
return nil
}
func (f *Framework) CreatePrometheus(e *spec.Prometheus) (*spec.Prometheus, error) {
b, err := json.Marshal(e)
if err != nil {
return nil, err
}
resp, err := f.HTTPClient.Post(
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/prometheuses", f.MasterHost, f.Namespace.Name),
"application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status: %v", resp.Status)
}
decoder := yaml.NewYAMLOrJSONDecoder(resp.Body, 100)
res := &spec.Prometheus{}
if err := decoder.Decode(res); err != nil {
return nil, err
}
return res, nil
}
func (f *Framework) DeletePrometheus(name string) error {
req, err := http.NewRequest("DELETE",
fmt.Sprintf("%s/apis/monitoring.coreos.com/v1alpha1/namespaces/%s/prometheuses/%s", f.MasterHost, f.Namespace.Name, name), nil)
if err != nil {
return err
}
resp, err := f.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %v", resp.Status)
}
return nil
}
func (f *Framework) createDeployment(deploy *v1beta1.Deployment) error {
if _, err := f.KubeClient.Extensions().Deployments(f.Namespace.Name).Create(deploy); err != nil {
return err

View file

@ -0,0 +1,62 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package framework
import (
"fmt"
"time"
"k8s.io/client-go/pkg/api/v1"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/prometheus"
)
func (f *Framework) MakeBasicPrometheus(name string, replicas int32) *v1alpha1.Prometheus {
return &v1alpha1.Prometheus{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.PrometheusSpec{
Replicas: replicas,
},
}
}
func (f *Framework) CreatePrometheusAndWaitUntilReady(p *v1alpha1.Prometheus) error {
_, err := f.MonClient.Prometheuses(f.Namespace.Name).Create(p)
if err != nil {
return err
}
_, err = f.WaitForPodsReady(time.Minute*2, int(p.Spec.Replicas), prometheus.ListOptions(p.Name))
if err != nil {
return fmt.Errorf("failed to create %d Prometheus instances (%s): %v", p.Spec.Replicas, p.Name, err)
}
return nil
}
func (f *Framework) DeletePrometheusAndWaitUntilGone(name string) error {
if err := f.MonClient.Prometheuses(f.Namespace.Name).Delete(name, nil); err != nil {
return err
}
if _, err := f.WaitForPodsReady(time.Minute*2, 0, prometheus.ListOptions(name)); err != nil {
return fmt.Errorf("failed to teardown Prometheus instances (%s): %v", name, err)
}
return nil
}

View file

@ -0,0 +1,33 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"testing"
)
func TestPrometheusCreateDeleteCluster(t *testing.T) {
name := "prometheus-test"
defer func() {
if err := framework.DeletePrometheusAndWaitUntilGone(name); err != nil {
t.Fatal(err)
}
}()
if err := framework.CreatePrometheusAndWaitUntilReady(framework.MakeBasicPrometheus(name, 1)); err != nil {
t.Fatal(err)
}
}

300
vendor/k8s.io/client-go/dynamic/client.go generated vendored Normal file
View file

@ -0,0 +1,300 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package dynamic provides a client interface to arbitrary Kubernetes
// APIs that exposes common high level operations and exposes common
// metadata.
package dynamic
import (
"encoding/json"
"errors"
"io"
"net/url"
"strings"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/pkg/conversion/queryparams"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/pkg/util/flowcontrol"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/rest"
)
// Client is a Kubernetes client that allows you to access metadata
// and manipulate metadata of a Kubernetes API group.
type Client struct {
cl *rest.RESTClient
parameterCodec runtime.ParameterCodec
}
// NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *rest.Config) (*Client, error) {
// avoid changing the original config
confCopy := *conf
conf = &confCopy
contentConfig := ContentConfig()
contentConfig.GroupVersion = conf.GroupVersion
if conf.NegotiatedSerializer != nil {
contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer
}
conf.ContentConfig = contentConfig
if conf.APIPath == "" {
conf.APIPath = "/api"
}
if len(conf.UserAgent) == 0 {
conf.UserAgent = rest.DefaultKubernetesUserAgent()
}
cl, err := rest.RESTClientFor(conf)
if err != nil {
return nil, err
}
return &Client{cl: cl}, nil
}
// GetRateLimiter returns rate limier.
func (c *Client) GetRateLimiter() flowcontrol.RateLimiter {
return c.cl.GetRateLimiter()
}
// Resource returns an API interface to the specified resource for this client's
// group and version. If resource is not a namespaced resource, then namespace
// is ignored. The ResourceClient inherits the parameter codec of c.
func (c *Client) Resource(resource *metav1.APIResource, namespace string) *ResourceClient {
return &ResourceClient{
cl: c.cl,
resource: resource,
ns: namespace,
parameterCodec: c.parameterCodec,
}
}
// ParameterCodec returns a client with the provided parameter codec.
func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) *Client {
return &Client{
cl: c.cl,
parameterCodec: parameterCodec,
}
}
// ResourceClient is an API interface to a specific resource under a
// dynamic client.
type ResourceClient struct {
cl *rest.RESTClient
resource *metav1.APIResource
ns string
parameterCodec runtime.ParameterCodec
}
// List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts runtime.Object) (runtime.Object, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(opts, parameterEncoder).
Do().
Get()
}
// Get gets the resource with the specified name.
func (rc *ResourceClient) Get(name string) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
err := rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Do().
Into(result)
return result, err
}
// Delete deletes the resource with the specified name.
func (rc *ResourceClient) Delete(name string, opts *v1.DeleteOptions) error {
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(opts).
Do().
Error()
}
// DeleteCollection deletes a collection of objects.
func (rc *ResourceClient) DeleteCollection(deleteOptions *v1.DeleteOptions, listOptions runtime.Object) error {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(listOptions, parameterEncoder).
Body(deleteOptions).
Do().
Error()
}
// Create creates the provided resource.
func (rc *ResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
err := rc.cl.Post().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Body(obj).
Do().
Into(result)
return result, err
}
// Update updates the provided resource.
func (rc *ResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
if len(obj.GetName()) == 0 {
return result, errors.New("object missing name")
}
err := rc.cl.Put().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(obj.GetName()).
Body(obj).
Do().
Into(result)
return result, err
}
// Watch returns a watch.Interface that watches the resource.
func (rc *ResourceClient) Watch(opts runtime.Object) (watch.Interface, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Get().
Prefix("watch").
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(opts, parameterEncoder).
Watch()
}
func (rc *ResourceClient) Patch(name string, pt api.PatchType, data []byte) (*unstructured.Unstructured, error) {
result := new(unstructured.Unstructured)
err := rc.cl.Patch(pt).
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(data).
Do().
Into(result)
return result, err
}
// dynamicCodec is a codec that wraps the standard unstructured codec
// with special handling for Status objects.
type dynamicCodec struct{}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := json.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}
func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
return unstructured.UnstructuredJSONScheme.Encode(obj, w)
}
// ContentConfig returns a rest.ContentConfig for dynamic types.
func ContentConfig() rest.ContentConfig {
var jsonInfo runtime.SerializerInfo
// TODO: api.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
// to talk to a kubernetes server
for _, info := range api.Codecs.SupportedMediaTypes() {
if info.MediaType == runtime.ContentTypeJSON {
jsonInfo = info
break
}
}
jsonInfo.Serializer = dynamicCodec{}
jsonInfo.PrettySerializer = nil
return rest.ContentConfig{
AcceptContentTypes: runtime.ContentTypeJSON,
ContentType: runtime.ContentTypeJSON,
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
}
}
// paramaterCodec is a codec converts an API object to query
// parameters without trying to convert to the target version.
type parameterCodec struct{}
func (parameterCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
return queryparams.Convert(obj)
}
func (parameterCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on dynamic parameterCodec")
}
var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{}
type versionedParameterEncoderWithV1Fallback struct{}
func (versionedParameterEncoderWithV1Fallback) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
ret, err := api.ParameterCodec.EncodeParameters(obj, to)
if err != nil && runtime.IsNotRegisteredError(err) {
// fallback to v1
return api.ParameterCodec.EncodeParameters(obj, v1.SchemeGroupVersion)
}
return ret, err
}
func (versionedParameterEncoderWithV1Fallback) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
return errors.New("DecodeParameters not implemented on versionedParameterEncoderWithV1Fallback")
}
// VersionedParameterEncoderWithV1Fallback is useful for encoding query
// parameters for thirdparty resources. It tries to convert object to the
// specified version before converting it to query parameters, and falls back to
// converting to v1 if the object is not registered in the specified version.
// For the record, currently API server always treats query parameters sent to a
// thirdparty resource endpoint as v1.
var VersionedParameterEncoderWithV1Fallback runtime.ParameterCodec = versionedParameterEncoderWithV1Fallback{}

115
vendor/k8s.io/client-go/dynamic/client_pool.go generated vendored Normal file
View file

@ -0,0 +1,115 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"sync"
"k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/rest"
)
// ClientPool manages a pool of dynamic clients.
type ClientPool interface {
// ClientForGroupVersionKind returns a client configured for the specified groupVersionResource.
// Resource may be empty.
ClientForGroupVersionResource(resource schema.GroupVersionResource) (*Client, error)
// ClientForGroupVersionKind returns a client configured for the specified groupVersionKind.
// Kind may be empty.
ClientForGroupVersionKind(kind schema.GroupVersionKind) (*Client, error)
}
// APIPathResolverFunc knows how to convert a groupVersion to its API path. The Kind field is
// optional.
type APIPathResolverFunc func(kind schema.GroupVersionKind) string
// LegacyAPIPathResolverFunc can resolve paths properly with the legacy API.
func LegacyAPIPathResolverFunc(kind schema.GroupVersionKind) string {
if len(kind.Group) == 0 {
return "/api"
}
return "/apis"
}
// clientPoolImpl implements ClientPool and caches clients for the resource group versions
// is asked to retrieve. This type is thread safe.
type clientPoolImpl struct {
lock sync.RWMutex
config *rest.Config
clients map[schema.GroupVersion]*Client
apiPathResolverFunc APIPathResolverFunc
mapper meta.RESTMapper
}
// NewClientPool returns a ClientPool from the specified config. It reuses clients for the the same
// group version. It is expected this type may be wrapped by specific logic that special cases certain
// resources or groups.
func NewClientPool(config *rest.Config, mapper meta.RESTMapper, apiPathResolverFunc APIPathResolverFunc) ClientPool {
confCopy := *config
return &clientPoolImpl{
config: &confCopy,
clients: map[schema.GroupVersion]*Client{},
apiPathResolverFunc: apiPathResolverFunc,
mapper: mapper,
}
}
// ClientForGroupVersionResource uses the provided RESTMapper to identify the appropriate resource. Resource may
// be empty. If no matching kind is found the underlying client for that group is still returned.
func (c *clientPoolImpl) ClientForGroupVersionResource(resource schema.GroupVersionResource) (*Client, error) {
kinds, err := c.mapper.KindsFor(resource)
if err != nil {
if meta.IsNoMatchError(err) {
return c.ClientForGroupVersionKind(schema.GroupVersionKind{Group: resource.Group, Version: resource.Version})
}
return nil, err
}
return c.ClientForGroupVersionKind(kinds[0])
}
// ClientForGroupVersion returns a client for the specified groupVersion, creates one if none exists. Kind
// in the GroupVersionKind may be empty.
func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind) (*Client, error) {
c.lock.Lock()
defer c.lock.Unlock()
gv := kind.GroupVersion()
// do we have a client already configured?
if existingClient, found := c.clients[gv]; found {
return existingClient, nil
}
// avoid changing the original config
confCopy := *c.config
conf := &confCopy
// we need to set the api path based on group version, if no group, default to legacy path
conf.APIPath = c.apiPathResolverFunc(kind)
// we need to make a client
conf.GroupVersion = &gv
dynamicClient, err := NewClient(conf)
if err != nil {
return nil, err
}
c.clients[gv] = dynamicClient
return dynamicClient, nil
}

96
vendor/k8s.io/client-go/dynamic/dynamic_util.go generated vendored Normal file
View file

@ -0,0 +1,96 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"k8s.io/client-go/pkg/api/meta"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/runtime/schema"
)
// VersionInterfaces provides an object converter and metadata
// accessor appropriate for use with unstructured objects.
func VersionInterfaces(schema.GroupVersion) (*meta.VersionInterfaces, error) {
return &meta.VersionInterfaces{
ObjectConvertor: &unstructured.UnstructuredObjectConverter{},
MetadataAccessor: meta.NewAccessor(),
}, nil
}
// NewDiscoveryRESTMapper returns a RESTMapper based on discovery information.
func NewDiscoveryRESTMapper(resources []*metav1.APIResourceList, versionFunc meta.VersionInterfacesFunc) (*meta.DefaultRESTMapper, error) {
rm := meta.NewDefaultRESTMapper(nil, versionFunc)
for _, resourceList := range resources {
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
return nil, err
}
for _, resource := range resourceList.APIResources {
gvk := gv.WithKind(resource.Kind)
scope := meta.RESTScopeRoot
if resource.Namespaced {
scope = meta.RESTScopeNamespace
}
rm.Add(gvk, scope)
}
}
return rm, nil
}
// ObjectTyper provides an ObjectTyper implementation for
// unstructured.Unstructured object based on discovery information.
type ObjectTyper struct {
registered map[schema.GroupVersionKind]bool
}
// NewObjectTyper constructs an ObjectTyper from discovery information.
func NewObjectTyper(resources []*metav1.APIResourceList) (runtime.ObjectTyper, error) {
ot := &ObjectTyper{registered: make(map[schema.GroupVersionKind]bool)}
for _, resourceList := range resources {
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
return nil, err
}
for _, resource := range resourceList.APIResources {
ot.registered[gv.WithKind(resource.Kind)] = true
}
}
return ot, nil
}
// ObjectKinds returns a slice of one element with the
// group,version,kind of the provided object, or an error if the
// object is not *unstructured.Unstructured or has no group,version,kind
// information.
func (ot *ObjectTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
if _, ok := obj.(*unstructured.Unstructured); !ok {
return nil, false, fmt.Errorf("type %T is invalid for dynamic object typer", obj)
}
return []schema.GroupVersionKind{obj.GetObjectKind().GroupVersionKind()}, false, nil
}
// Recognizes returns true if the provided group,version,kind was in
// the discovery information.
func (ot *ObjectTyper) Recognizes(gvk schema.GroupVersionKind) bool {
return ot.registered[gvk]
}

6
vendor/vendor.json vendored
View file

@ -1195,6 +1195,12 @@
"revision": "5fe6fc56cb38d04ef4af601a03599c984229dea2",
"revisionTime": "2016-12-17T15:19:41Z"
},
{
"checksumSHA1": "8OmPeXMu5pavjOjPfh4CVKcqpv4=",
"path": "k8s.io/client-go/dynamic",
"revision": "5fe6fc56cb38d04ef4af601a03599c984229dea2",
"revisionTime": "2016-12-17T15:19:41Z"
},
{
"checksumSHA1": "LKZOLJW3XFi77HYufnVS30uYDg0=",
"path": "k8s.io/client-go/kubernetes",