1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 03:38:43 +00:00

*: implement k8s.io/client-go workqueue

This commit is contained in:
Frederic Branczyk 2017-01-26 15:39:06 +01:00
parent b9a3c7331e
commit 80e714917f
No known key found for this signature in database
GPG key ID: CA14788B1E48B256
3 changed files with 46 additions and 191 deletions
pkg
alertmanager
prometheus
queue

View file

@ -23,7 +23,6 @@ import (
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/prometheus"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/go-kit/kit/log"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -38,6 +37,7 @@ import (
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1"
extensionsobj "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/tools/cache"
)
@ -57,7 +57,7 @@ type Operator struct {
alrtInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
queue *queue.Queue
queue workqueue.RateLimitingInterface
host string
}
@ -82,7 +82,7 @@ func New(c prometheus.Config, logger log.Logger) (*Operator, error) {
kclient: client,
mclient: mclient,
logger: logger,
queue: queue.New(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "alertmanager"),
host: cfg.Host,
}
@ -205,28 +205,29 @@ func (c *Operator) enqueueForNamespace(ns string) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Operator) worker() {
for {
key, quit := c.queue.Get()
if quit {
return
}
if err := c.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("reconciliation failed, re-enqueueing: %s", err))
// We only mark the item as done after waiting. In the meantime
// other items can be processed but the same item won't be processed again.
// This is a trivial form of rate-limiting that is sufficient for our throughput
// and latency expectations.
go func() {
time.Sleep(3 * time.Second)
c.queue.Done(key)
}()
continue
}
c.queue.Done(key)
for c.processNextWorkItem() {
}
}
func (c *Operator) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.sync(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *Operator) alertmanagerForStatefulSet(sset interface{}) *v1alpha1.Alertmanager {
key, ok := c.keyFunc(sset)
if !ok {

View file

@ -22,7 +22,6 @@ import (
"github.com/coreos/prometheus-operator/pkg/analytics"
"github.com/coreos/prometheus-operator/pkg/client/monitoring/v1alpha1"
"github.com/coreos/prometheus-operator/pkg/k8sutil"
"github.com/coreos/prometheus-operator/pkg/queue"
"github.com/go-kit/kit/log"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -37,6 +36,7 @@ import (
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1"
extensionsobj "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
@ -60,7 +60,7 @@ type Operator struct {
cmapInf cache.SharedIndexInformer
ssetInf cache.SharedIndexInformer
queue *queue.Queue
queue workqueue.RateLimitingInterface
host string
}
@ -91,7 +91,7 @@ func New(conf Config, logger log.Logger) (*Operator, error) {
kclient: client,
mclient: mclient,
logger: logger,
queue: queue.New(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "prometheus"),
host: cfg.Host,
}
@ -309,28 +309,29 @@ func (c *Operator) enqueueForNamespace(ns string) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Operator) worker() {
for {
key, quit := c.queue.Get()
if quit {
return
}
if err := c.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("reconciliation failed, re-enqueueing: %s", err))
// We only mark the item as done after waiting. In the meantime
// other items can be processed but the same item won't be processed again.
// This is a trivial form of rate-limiting that is sufficient for our throughput
// and latency expectations.
go func() {
time.Sleep(3 * time.Second)
c.queue.Done(key)
}()
continue
}
c.queue.Done(key)
for c.processNextWorkItem() {
}
}
func (c *Operator) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.sync(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *Operator) prometheusForStatefulSet(sset interface{}) *v1alpha1.Prometheus {
key, ok := c.keyFunc(sset)
if !ok {

View file

@ -1,147 +0,0 @@
// Copyright 2016 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import "sync"
// New constructs a new workqueue.
func New() *Queue {
return &Queue{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Queue is a work queue.
type Queue struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
// Add marks item as needing processing.
func (q *Queue) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Queue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Queue) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Queue) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Queue) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
func (q *Queue) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}