1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 07:57:07 +00:00
kyverno/pkg/event/controller.go

217 lines
5.6 KiB
Go
Raw Normal View History

2019-05-10 00:05:21 -07:00
package event
import (
"time"
"github.com/golang/glog"
2019-08-14 10:01:47 -07:00
2019-08-17 09:58:14 -07:00
"github.com/nirmata/kyverno/pkg/client/clientset/versioned/scheme"
2019-11-13 13:41:08 -08:00
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1"
client "github.com/nirmata/kyverno/pkg/dclient"
2019-05-10 00:05:21 -07:00
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2019-11-15 15:59:37 -08:00
"k8s.io/client-go/tools/cache"
2019-05-10 00:05:21 -07:00
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
2019-08-09 13:41:56 -07:00
//Generator generate events
type Generator struct {
2019-11-15 15:59:37 -08:00
client *client.Client
// list/get cluster policy
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has been synced at least once
pSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
//Interface to generate event
type Interface interface {
2019-08-26 13:34:42 -07:00
Add(infoList ...Info)
2019-05-10 00:05:21 -07:00
}
2019-05-10 10:38:38 -07:00
2019-08-09 13:41:56 -07:00
//NewEventGenerator to generate a new event controller
2019-11-13 13:41:08 -08:00
func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterPolicyInformer) *Generator {
2019-05-14 18:02:11 +03:00
2019-08-09 13:41:56 -07:00
gen := Generator{
client: client,
pLister: pInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
2019-11-15 15:59:37 -08:00
pSynced: pInformer.Informer().HasSynced,
recorder: initRecorder(client),
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
return &gen
2019-05-10 00:05:21 -07:00
}
func initRecorder(client *client.Client) record.EventRecorder {
2019-05-10 00:05:21 -07:00
// Initliaze Event Broadcaster
2019-08-17 09:58:14 -07:00
err := scheme.AddToScheme(scheme.Scheme)
2019-05-21 09:27:04 -07:00
if err != nil {
glog.Error(err)
2019-05-21 09:27:04 -07:00
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.V(4).Infof)
eventInterface, err := client.GetEventsInterface()
if err != nil {
glog.Error(err) // TODO: add more specific error
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: eventInterface})
2019-05-10 00:05:21 -07:00
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: eventSource})
return recorder
}
2019-08-09 13:41:56 -07:00
//Add queues an event for generation
2019-08-26 13:34:42 -07:00
func (gen *Generator) Add(infos ...Info) {
2019-06-27 11:43:07 -07:00
for _, info := range infos {
if info.Name == "" {
// dont create event for resources with generateName
// as the name is not generated yet
glog.V(4).Infof("recieved info %v, not creating an event as the resource has not been assigned a name yet", info)
continue
}
2019-08-26 13:34:42 -07:00
gen.queue.Add(info)
2019-06-26 18:04:50 -07:00
}
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
// Run begins generator
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
2019-05-10 00:05:21 -07:00
defer utilruntime.HandleCrash()
2019-08-09 13:41:56 -07:00
glog.Info("Starting event generator")
defer glog.Info("Shutting down event generator")
2019-05-10 00:05:21 -07:00
2019-11-15 15:59:37 -08:00
if !cache.WaitForCacheSync(stopCh, gen.pSynced) {
glog.Error("event generator: failed to sync informer cache")
}
2019-08-09 13:41:56 -07:00
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
<-stopCh
}
2019-07-19 16:17:10 -07:00
2019-08-09 13:41:56 -07:00
func (gen *Generator) runWorker() {
for gen.processNextWorkItem() {
2019-05-10 00:05:21 -07:00
}
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) handleErr(err error, key interface{}) {
2019-07-19 16:17:10 -07:00
if err == nil {
2019-08-09 13:41:56 -07:00
gen.queue.Forget(key)
2019-07-19 16:17:10 -07:00
return
}
// This controller retries if something goes wrong. After that, it stops trying.
2019-08-09 13:41:56 -07:00
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
2019-07-19 16:17:10 -07:00
glog.Warningf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
2019-08-09 13:41:56 -07:00
gen.queue.AddRateLimited(key)
2019-07-19 16:17:10 -07:00
return
}
2019-08-09 13:41:56 -07:00
gen.queue.Forget(key)
2019-07-19 16:17:10 -07:00
glog.Error(err)
glog.Warningf("Dropping the key out of the queue: %v", err)
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) processNextWorkItem() bool {
obj, shutdown := gen.queue.Get()
2019-05-10 00:05:21 -07:00
if shutdown {
return false
}
2019-07-19 16:17:10 -07:00
2019-05-10 00:05:21 -07:00
err := func(obj interface{}) error {
2019-08-09 13:41:56 -07:00
defer gen.queue.Done(obj)
2019-05-10 12:36:55 -07:00
var key Info
2019-05-10 00:05:21 -07:00
var ok bool
2019-07-19 16:17:10 -07:00
2019-05-10 12:36:55 -07:00
if key, ok = obj.(Info); !ok {
2019-08-09 13:41:56 -07:00
gen.queue.Forget(obj)
glog.Warningf("Expecting type info by got %v\n", obj)
2019-05-10 00:05:21 -07:00
return nil
}
2019-08-09 13:41:56 -07:00
err := gen.syncHandler(key)
gen.handleErr(err, obj)
2019-05-10 00:05:21 -07:00
return nil
}(obj)
if err != nil {
2019-07-19 16:17:10 -07:00
glog.Error(err)
return true
2019-05-10 00:05:21 -07:00
}
return true
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) syncHandler(key Info) error {
2019-06-26 12:41:42 -07:00
var robj runtime.Object
2019-05-10 00:05:21 -07:00
var err error
switch key.Kind {
case "ClusterPolicy":
//TODO: policy is clustered resource so wont need namespace
robj, err = gen.pLister.Get(key.Name)
2019-05-10 00:05:21 -07:00
if err != nil {
2019-07-08 16:53:34 -07:00
glog.Errorf("Error creating event: unable to get policy %s, will retry ", key.Name)
2019-05-10 00:05:21 -07:00
return err
}
default:
2019-08-09 13:41:56 -07:00
robj, err = gen.client.GetResource(key.Kind, key.Namespace, key.Name)
2019-05-10 00:05:21 -07:00
if err != nil {
2019-07-08 18:03:21 -07:00
glog.Errorf("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name)
2019-05-10 00:05:21 -07:00
return err
}
}
2019-07-08 16:53:34 -07:00
2019-06-26 12:41:42 -07:00
if key.Reason == PolicyApplied.String() {
2019-08-09 13:41:56 -07:00
gen.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message)
2019-06-26 12:41:42 -07:00
} else {
2019-08-09 13:41:56 -07:00
gen.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message)
2019-06-26 12:41:42 -07:00
}
2019-05-10 00:05:21 -07:00
return nil
}
2019-08-09 13:41:56 -07:00
//TODO: check if we need this ?
2019-05-10 12:36:55 -07:00
//NewEvent returns a new event
func NewEvent(rkind string, rnamespace string, rname string, reason Reason, message MsgKey, args ...interface{}) *Info {
2019-06-26 15:31:18 -07:00
msgText, err := getEventMsg(message, args...)
2019-05-10 00:05:21 -07:00
if err != nil {
glog.Error(err)
2019-05-10 00:05:21 -07:00
}
2019-06-26 18:04:50 -07:00
return &Info{
2019-06-26 12:41:42 -07:00
Kind: rkind,
Name: rname,
Namespace: rnamespace,
Reason: reason.String(),
Message: msgText,
2019-05-10 00:05:21 -07:00
}
}
2019-08-26 13:34:42 -07:00
func NewEventNew(
rkind,
rapiVersion,
rnamespace,
rname,
reason string,
message MsgKey,
args ...interface{}) Info {
msgText, err := getEventMsg(message, args...)
if err != nil {
glog.Error(err)
}
return Info{
Kind: rkind,
Name: rname,
Namespace: rnamespace,
Reason: reason,
Message: msgText,
}
}