1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 16:06:56 +00:00
kyverno/pkg/annotations/controller.go
2019-08-02 11:20:56 -07:00

108 lines
2.6 KiB
Go

package annotations
import (
"fmt"
"time"
"github.com/golang/glog"
client "github.com/nirmata/kyverno/pkg/dclient"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
type controller struct {
client *client.Client
queue workqueue.RateLimitingInterface
}
type Interface interface {
Add(rkind, rns, rname string, patch []byte)
}
type Controller interface {
Interface
Run(stopCh <-chan struct{})
Stop()
}
func NewAnnotationControler(client *client.Client) Controller {
return &controller{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), annotationQueueName),
}
}
func (c *controller) Add(rkind, rns, rname string, patch []byte) {
c.queue.Add(newInfo(rkind, rns, rname, &patch))
}
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for i := 0; i < workerThreadCount; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
glog.Info("Started annotation controller workers")
}
func (c *controller) Stop() {
c.queue.ShutDown()
glog.Info("Shutting down annotation controller workers")
}
func (c *controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (pc *controller) processNextWorkItem() bool {
obj, shutdown := pc.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer pc.queue.Done(obj)
err := pc.syncHandler(obj)
pc.handleErr(err, obj)
return nil
}(obj)
if err != nil {
glog.Error(err)
return true
}
return true
}
func (pc *controller) handleErr(err error, key interface{}) {
if err == nil {
pc.queue.Forget(key)
return
}
// This controller retries if something goes wrong. After that, it stops trying.
if pc.queue.NumRequeues(key) < workQueueRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
pc.queue.AddRateLimited(key)
return
}
pc.queue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key out of the queue: %v", err)
}
func (c *controller) syncHandler(obj interface{}) error {
var key info
var ok bool
if key, ok = obj.(info); !ok {
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
var err error
_, err = c.client.PatchResource(key.RKind, key.RNs, key.RName, *key.patch)
if err != nil {
glog.Errorf("Error creating annotation: unable to get resource %s/%s/%s, will retry: %s", key.RKind, key.RNs, key.RName, err)
return err
}
return nil
}