1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 07:57:07 +00:00
kyverno/pkg/event/controller.go

233 lines
6.5 KiB
Go
Raw Normal View History

2019-05-10 00:05:21 -07:00
package event
import (
"github.com/go-logr/logr"
2019-08-17 09:58:14 -07:00
"github.com/nirmata/kyverno/pkg/client/clientset/versioned/scheme"
2019-11-13 13:41:08 -08:00
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1"
"github.com/nirmata/kyverno/pkg/constant"
client "github.com/nirmata/kyverno/pkg/dclient"
2019-05-10 00:05:21 -07:00
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2019-11-15 15:59:37 -08:00
"k8s.io/client-go/tools/cache"
2019-05-10 00:05:21 -07:00
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
2019-05-10 00:05:21 -07:00
)
2019-08-09 13:41:56 -07:00
//Generator generate events
type Generator struct {
2019-11-15 15:59:37 -08:00
client *client.Client
// list/get cluster policy
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has been synced at least once
pSynced cache.InformerSynced
// queue to store event generation requests
queue workqueue.RateLimitingInterface
// events generated at policy controller
policyCtrRecorder record.EventRecorder
// events generated at admission control
admissionCtrRecorder record.EventRecorder
// events generated at namespaced policy controller to process 'generate' rule
genPolicyRecorder record.EventRecorder
2020-03-17 11:05:20 -07:00
log logr.Logger
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
//Interface to generate event
type Interface interface {
2019-08-26 13:34:42 -07:00
Add(infoList ...Info)
2019-05-10 00:05:21 -07:00
}
2019-05-10 10:38:38 -07:00
2019-08-09 13:41:56 -07:00
//NewEventGenerator to generate a new event controller
2020-03-17 11:05:20 -07:00
func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterPolicyInformer, log logr.Logger) *Generator {
2019-05-14 18:02:11 +03:00
2019-08-09 13:41:56 -07:00
gen := Generator{
client: client,
pLister: pInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(rateLimiter(), eventWorkQueueName),
pSynced: pInformer.Informer().HasSynced,
2020-03-17 11:05:20 -07:00
policyCtrRecorder: initRecorder(client, PolicyController, log),
admissionCtrRecorder: initRecorder(client, AdmissionController, log),
genPolicyRecorder: initRecorder(client, GeneratePolicyController, log),
log: log,
}
2019-08-09 13:41:56 -07:00
return &gen
2019-05-10 00:05:21 -07:00
}
func rateLimiter() workqueue.RateLimiter {
return workqueue.DefaultItemBasedRateLimiter()
}
2020-03-17 11:05:20 -07:00
func initRecorder(client *client.Client, eventSource Source, log logr.Logger) record.EventRecorder {
2019-05-10 00:05:21 -07:00
// Initliaze Event Broadcaster
2019-08-17 09:58:14 -07:00
err := scheme.AddToScheme(scheme.Scheme)
2019-05-21 09:27:04 -07:00
if err != nil {
2020-03-17 11:05:20 -07:00
log.Error(err, "failed to add to scheme")
2019-05-21 09:27:04 -07:00
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster := record.NewBroadcaster()
2020-05-15 18:33:52 -07:00
eventBroadcaster.StartLogging(klog.V(5).Infof)
eventInterface, err := client.GetEventsInterface()
if err != nil {
2020-03-17 11:05:20 -07:00
log.Error(err, "failed to get event interface for logging")
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: eventInterface})
2019-05-10 00:05:21 -07:00
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: eventSource.String()})
2019-05-10 00:05:21 -07:00
return recorder
}
2019-08-09 13:41:56 -07:00
//Add queues an event for generation
2019-08-26 13:34:42 -07:00
func (gen *Generator) Add(infos ...Info) {
2020-03-17 11:05:20 -07:00
logger := gen.log
2019-06-27 11:43:07 -07:00
for _, info := range infos {
if info.Name == "" {
// dont create event for resources with generateName
// as the name is not generated yet
2020-03-17 11:05:20 -07:00
logger.V(4).Info("not creating an event as the resource has not been assigned a name yet", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace)
continue
}
2019-08-26 13:34:42 -07:00
gen.queue.Add(info)
2019-06-26 18:04:50 -07:00
}
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
// Run begins generator
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
2020-03-17 11:05:20 -07:00
logger := gen.log
2019-05-10 00:05:21 -07:00
defer utilruntime.HandleCrash()
2020-03-17 11:05:20 -07:00
logger.Info("start")
defer logger.Info("shutting down")
2019-05-10 00:05:21 -07:00
2019-11-15 15:59:37 -08:00
if !cache.WaitForCacheSync(stopCh, gen.pSynced) {
2020-03-17 11:05:20 -07:00
logger.Info("failed to sync informer cache")
2019-11-15 15:59:37 -08:00
}
2019-08-09 13:41:56 -07:00
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, constant.EventControllerResync, stopCh)
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
<-stopCh
}
2019-07-19 16:17:10 -07:00
2019-08-09 13:41:56 -07:00
func (gen *Generator) runWorker() {
for gen.processNextWorkItem() {
2019-05-10 00:05:21 -07:00
}
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) handleErr(err error, key interface{}) {
2020-03-17 11:05:20 -07:00
logger := gen.log
2019-07-19 16:17:10 -07:00
if err == nil {
2019-08-09 13:41:56 -07:00
gen.queue.Forget(key)
2019-07-19 16:17:10 -07:00
return
}
// This controller retries if something goes wrong. After that, it stops trying.
2019-08-09 13:41:56 -07:00
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.V(4).Info("retrying event generation", "key", key, "reason", err.Error())
2019-07-19 16:17:10 -07:00
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
2019-08-09 13:41:56 -07:00
gen.queue.AddRateLimited(key)
2019-07-19 16:17:10 -07:00
return
}
2019-08-09 13:41:56 -07:00
gen.queue.Forget(key)
logger.Error(err, "failed to generate event", "key", key)
2019-07-19 16:17:10 -07:00
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) processNextWorkItem() bool {
2020-03-17 11:05:20 -07:00
logger := gen.log
2019-08-09 13:41:56 -07:00
obj, shutdown := gen.queue.Get()
2019-05-10 00:05:21 -07:00
if shutdown {
return false
}
2019-07-19 16:17:10 -07:00
2019-05-10 00:05:21 -07:00
err := func(obj interface{}) error {
2019-08-09 13:41:56 -07:00
defer gen.queue.Done(obj)
2019-05-10 12:36:55 -07:00
var key Info
2019-05-10 00:05:21 -07:00
var ok bool
2019-07-19 16:17:10 -07:00
2019-05-10 12:36:55 -07:00
if key, ok = obj.(Info); !ok {
2019-08-09 13:41:56 -07:00
gen.queue.Forget(obj)
2020-03-17 11:05:20 -07:00
logger.Info("Incorrect type; expected type 'info'", "obj", obj)
2019-05-10 00:05:21 -07:00
return nil
}
2019-08-09 13:41:56 -07:00
err := gen.syncHandler(key)
gen.handleErr(err, obj)
2019-05-10 00:05:21 -07:00
return nil
}(obj)
if err != nil {
2020-03-17 11:05:20 -07:00
logger.Error(err, "failed to process next work item")
2019-07-19 16:17:10 -07:00
return true
2019-05-10 00:05:21 -07:00
}
return true
}
2019-08-09 13:41:56 -07:00
func (gen *Generator) syncHandler(key Info) error {
2020-03-17 11:05:20 -07:00
logger := gen.log
2019-06-26 12:41:42 -07:00
var robj runtime.Object
2019-05-10 00:05:21 -07:00
var err error
switch key.Kind {
case "ClusterPolicy":
//TODO: policy is clustered resource so wont need namespace
robj, err = gen.pLister.Get(key.Name)
2019-05-10 00:05:21 -07:00
if err != nil {
2020-03-17 11:05:20 -07:00
logger.Error(err, "failed to get policy", "name", key.Name)
2019-05-10 00:05:21 -07:00
return err
}
default:
robj, err = gen.client.GetResource("", key.Kind, key.Namespace, key.Name)
2019-05-10 00:05:21 -07:00
if err != nil {
2020-03-17 11:05:20 -07:00
logger.Error(err, "failed to get resource", "kind", key.Kind, "name", key.Name, "namespace", key.Namespace)
2019-05-10 00:05:21 -07:00
return err
}
}
2019-07-08 16:53:34 -07:00
// set the event type based on reason
eventType := v1.EventTypeWarning
2019-05-10 00:05:21 -07:00
// based on the source of event generation, use different event recorders
switch key.Source {
case AdmissionController:
gen.admissionCtrRecorder.Event(robj, eventType, key.Reason, key.Message)
case PolicyController:
gen.policyCtrRecorder.Event(robj, eventType, key.Reason, key.Message)
case GeneratePolicyController:
gen.genPolicyRecorder.Event(robj, eventType, key.Reason, key.Message)
default:
2020-03-17 11:05:20 -07:00
logger.Info("info.source not defined for the request")
2019-05-10 00:05:21 -07:00
}
return nil
2019-05-10 00:05:21 -07:00
}
2019-08-26 13:34:42 -07:00
//NewEvent builds a event creation request
func NewEvent(
2020-03-17 11:05:20 -07:00
log logr.Logger,
2019-08-26 13:34:42 -07:00
rkind,
rapiVersion,
rnamespace,
rname,
reason string,
source Source,
2019-08-26 13:34:42 -07:00
message MsgKey,
args ...interface{}) Info {
msgText, err := getEventMsg(message, args...)
if err != nil {
2020-03-17 11:05:20 -07:00
log.Error(err, "failed to get event message")
2019-08-26 13:34:42 -07:00
}
return Info{
Kind: rkind,
Name: rname,
Namespace: rnamespace,
Reason: reason,
Source: source,
2019-08-26 13:34:42 -07:00
Message: msgText,
}
}