1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-07 00:17:13 +00:00
kyverno/pkg/event/controller.go
shivkumar dudhani 4166d13684 correct case
2019-07-19 16:18:36 -07:00

186 lines
4.7 KiB
Go

package event
import (
"time"
"github.com/golang/glog"
"github.com/nirmata/kyverno/pkg/client/clientset/versioned/scheme"
policyscheme "github.com/nirmata/kyverno/pkg/client/clientset/versioned/scheme"
v1alpha1 "github.com/nirmata/kyverno/pkg/client/listers/policy/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/sharedinformer"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
type controller struct {
client *client.Client
policyLister v1alpha1.PolicyLister
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
//Generator to generate event
type Generator interface {
Add(infoList ...*Info)
}
//Controller api
type Controller interface {
Generator
Run(stopCh <-chan struct{})
Stop()
}
//NewEventController to generate a new event controller
func NewEventController(client *client.Client,
shareInformer sharedinformer.PolicyInformer) Controller {
return &controller{
client: client,
policyLister: shareInformer.GetLister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
recorder: initRecorder(client),
}
}
func initRecorder(client *client.Client) record.EventRecorder {
// Initliaze Event Broadcaster
err := policyscheme.AddToScheme(scheme.Scheme)
if err != nil {
glog.Error(err)
return nil
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventInterface, err := client.GetEventsInterface()
if err != nil {
glog.Error(err) // TODO: add more specific error
return nil
}
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: eventInterface})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: eventSource})
return recorder
}
func (c *controller) Add(infos ...*Info) {
for _, info := range infos {
c.queue.Add(*info)
}
}
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for i := 0; i < eventWorkerThreadCount; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
glog.Info("Started eventbuilder controller workers")
}
func (c *controller) Stop() {
defer c.queue.ShutDown()
glog.Info("Shutting down eventbuilder controller workers")
}
func (c *controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
// This controller retries if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < workQueueRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key out of the queue: %v", err)
}
func (c *controller) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.queue.Done(obj)
var key Info
var ok bool
if key, ok = obj.(Info); !ok {
c.queue.Forget(obj)
glog.Warningf("Expecting type info by got %v\n", obj)
return nil
}
err := c.syncHandler(key)
c.handleErr(err, obj)
return nil
}(obj)
if err != nil {
glog.Error(err)
return true
}
return true
}
func (c *controller) syncHandler(key Info) error {
var robj runtime.Object
var err error
switch key.Kind {
case "Policy":
//TODO: policy is clustered resource so wont need namespace
robj, err = c.policyLister.Get(key.Name)
if err != nil {
glog.Errorf("Error creating event: unable to get policy %s, will retry ", key.Name)
return err
}
default:
robj, err = c.client.GetResource(key.Kind, key.Namespace, key.Name)
if err != nil {
glog.Errorf("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name)
return err
}
}
if key.Reason == PolicyApplied.String() {
c.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message)
} else {
c.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message)
}
return nil
}
//NewEvent returns a new event
func NewEvent(rkind string, rnamespace string, rname string, reason Reason, message MsgKey, args ...interface{}) *Info {
msgText, err := getEventMsg(message, args...)
if err != nil {
glog.Error(err)
}
return &Info{
Kind: rkind,
Name: rname,
Namespace: rnamespace,
Reason: reason.String(),
Message: msgText,
}
}