2019-05-10 00:05:21 -07:00
package event
import (
2020-12-23 17:48:00 -08:00
"time"
2020-07-20 08:00:02 -07:00
"github.com/go-logr/logr"
2020-10-07 11:12:31 -07:00
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
client "github.com/kyverno/kyverno/pkg/dclient"
2019-05-10 00:05:21 -07:00
v1 "k8s.io/api/core/v1"
2020-11-03 16:07:02 -08:00
errors "k8s.io/apimachinery/pkg/api/errors"
2019-05-10 00:05:21 -07:00
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2019-11-15 15:59:37 -08:00
"k8s.io/client-go/tools/cache"
2019-05-10 00:05:21 -07:00
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
2021-02-07 20:26:56 -08:00
"k8s.io/klog/v2"
2019-05-10 00:05:21 -07:00
)
2019-08-09 13:41:56 -07:00
//Generator generate events
type Generator struct {
2019-11-15 15:59:37 -08:00
client * client . Client
// list/get cluster policy
2021-06-30 00:43:11 +03:00
cpLister kyvernolister . ClusterPolicyLister
2019-11-15 15:59:37 -08:00
// returns true if the cluster policy store has been synced at least once
2021-06-30 00:43:11 +03:00
cpSynced cache . InformerSynced
// list/get policy
pLister kyvernolister . PolicyLister
// returns true if the policy store has been synced at least once
2020-01-07 10:33:28 -08:00
pSynced cache . InformerSynced
// queue to store event generation requests
queue workqueue . RateLimitingInterface
2019-12-26 11:50:41 -08:00
// events generated at policy controller
policyCtrRecorder record . EventRecorder
// events generated at admission control
admissionCtrRecorder record . EventRecorder
// events generated at namespaced policy controller to process 'generate' rule
genPolicyRecorder record . EventRecorder
2020-03-17 11:05:20 -07:00
log logr . Logger
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
//Interface to generate event
type Interface interface {
2019-08-26 13:34:42 -07:00
Add ( infoList ... Info )
2019-05-10 00:05:21 -07:00
}
2019-05-10 10:38:38 -07:00
2019-08-09 13:41:56 -07:00
//NewEventGenerator to generate a new event controller
2022-01-21 18:36:05 +08:00
func NewEventGenerator ( client * client . Client , cpInformer kyvernoinformer . ClusterPolicyInformer , pInformer kyvernoinformer . PolicyInformer , log logr . Logger ) * Generator {
2019-05-14 18:02:11 +03:00
2019-08-09 13:41:56 -07:00
gen := Generator {
2019-12-26 11:50:41 -08:00
client : client ,
2021-06-30 00:43:11 +03:00
cpLister : cpInformer . Lister ( ) ,
cpSynced : cpInformer . Informer ( ) . HasSynced ,
2019-12-26 11:50:41 -08:00
pLister : pInformer . Lister ( ) ,
pSynced : pInformer . Informer ( ) . HasSynced ,
2021-06-30 00:43:11 +03:00
queue : workqueue . NewNamedRateLimitingQueue ( rateLimiter ( ) , eventWorkQueueName ) ,
2020-03-17 11:05:20 -07:00
policyCtrRecorder : initRecorder ( client , PolicyController , log ) ,
admissionCtrRecorder : initRecorder ( client , AdmissionController , log ) ,
genPolicyRecorder : initRecorder ( client , GeneratePolicyController , log ) ,
log : log ,
2019-12-26 11:50:41 -08:00
}
2019-08-09 13:41:56 -07:00
return & gen
2019-05-10 00:05:21 -07:00
}
2020-06-30 11:53:27 -07:00
func rateLimiter ( ) workqueue . RateLimiter {
2020-08-26 14:28:34 +05:30
return workqueue . DefaultItemBasedRateLimiter ( )
2020-06-30 11:53:27 -07:00
}
2020-03-17 11:05:20 -07:00
func initRecorder ( client * client . Client , eventSource Source , log logr . Logger ) record . EventRecorder {
2019-05-10 00:05:21 -07:00
// Initliaze Event Broadcaster
2019-08-17 09:58:14 -07:00
err := scheme . AddToScheme ( scheme . Scheme )
2019-05-21 09:27:04 -07:00
if err != nil {
2020-03-17 11:05:20 -07:00
log . Error ( err , "failed to add to scheme" )
2019-05-21 09:27:04 -07:00
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster := record . NewBroadcaster ( )
2020-05-15 18:33:52 -07:00
eventBroadcaster . StartLogging ( klog . V ( 5 ) . Infof )
2019-05-15 07:30:22 -07:00
eventInterface , err := client . GetEventsInterface ( )
if err != nil {
2020-03-17 11:05:20 -07:00
log . Error ( err , "failed to get event interface for logging" )
2019-05-15 07:30:22 -07:00
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster . StartRecordingToSink (
& typedcorev1 . EventSinkImpl {
2019-05-15 07:30:22 -07:00
Interface : eventInterface } )
2019-05-10 00:05:21 -07:00
recorder := eventBroadcaster . NewRecorder (
scheme . Scheme ,
2019-12-26 11:50:41 -08:00
v1 . EventSource { Component : eventSource . String ( ) } )
2019-05-10 00:05:21 -07:00
return recorder
}
2019-08-09 13:41:56 -07:00
//Add queues an event for generation
2019-08-26 13:34:42 -07:00
func ( gen * Generator ) Add ( infos ... Info ) {
2020-03-17 11:05:20 -07:00
logger := gen . log
2019-06-27 11:43:07 -07:00
for _ , info := range infos {
2019-09-04 15:30:09 -07:00
if info . Name == "" {
// dont create event for resources with generateName
// as the name is not generated yet
2020-03-17 11:05:20 -07:00
logger . V ( 4 ) . Info ( "not creating an event as the resource has not been assigned a name yet" , "kind" , info . Kind , "name" , info . Name , "namespace" , info . Namespace )
2019-09-04 15:30:09 -07:00
continue
}
2019-08-26 13:34:42 -07:00
gen . queue . Add ( info )
2019-06-26 18:04:50 -07:00
}
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
// Run begins generator
func ( gen * Generator ) Run ( workers int , stopCh <- chan struct { } ) {
2020-03-17 11:05:20 -07:00
logger := gen . log
2019-05-10 00:05:21 -07:00
defer utilruntime . HandleCrash ( )
2020-03-17 11:05:20 -07:00
logger . Info ( "start" )
defer logger . Info ( "shutting down" )
2019-05-10 00:05:21 -07:00
2021-06-30 00:43:11 +03:00
if ! cache . WaitForCacheSync ( stopCh , gen . cpSynced , gen . pSynced ) {
2020-03-17 11:05:20 -07:00
logger . Info ( "failed to sync informer cache" )
2019-11-15 15:59:37 -08:00
}
2019-08-09 13:41:56 -07:00
for i := 0 ; i < workers ; i ++ {
2020-12-23 17:48:00 -08:00
go wait . Until ( gen . runWorker , time . Second , stopCh )
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
<- stopCh
2019-05-15 07:30:22 -07:00
}
2019-07-19 16:17:10 -07:00
2019-08-09 13:41:56 -07:00
func ( gen * Generator ) runWorker ( ) {
for gen . processNextWorkItem ( ) {
2019-05-10 00:05:21 -07:00
}
}
2019-08-09 13:41:56 -07:00
func ( gen * Generator ) handleErr ( err error , key interface { } ) {
2020-03-17 11:05:20 -07:00
logger := gen . log
2019-07-19 16:17:10 -07:00
if err == nil {
2019-08-09 13:41:56 -07:00
gen . queue . Forget ( key )
2019-07-19 16:17:10 -07:00
return
}
// This controller retries if something goes wrong. After that, it stops trying.
2019-08-09 13:41:56 -07:00
if gen . queue . NumRequeues ( key ) < workQueueRetryLimit {
2020-06-30 11:53:27 -07:00
logger . V ( 4 ) . Info ( "retrying event generation" , "key" , key , "reason" , err . Error ( ) )
2019-07-19 16:17:10 -07:00
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
2019-08-09 13:41:56 -07:00
gen . queue . AddRateLimited ( key )
2019-07-19 16:17:10 -07:00
return
}
2020-06-30 11:53:27 -07:00
2019-08-09 13:41:56 -07:00
gen . queue . Forget ( key )
2020-11-03 16:07:02 -08:00
if ! errors . IsNotFound ( err ) {
logger . Error ( err , "failed to generate event" , "key" , key )
}
2019-07-19 16:17:10 -07:00
}
2019-08-09 13:41:56 -07:00
func ( gen * Generator ) processNextWorkItem ( ) bool {
2020-03-17 11:05:20 -07:00
logger := gen . log
2019-08-09 13:41:56 -07:00
obj , shutdown := gen . queue . Get ( )
2019-05-10 00:05:21 -07:00
if shutdown {
return false
}
2019-07-19 16:17:10 -07:00
2019-05-10 00:05:21 -07:00
err := func ( obj interface { } ) error {
2019-08-09 13:41:56 -07:00
defer gen . queue . Done ( obj )
2019-05-10 12:36:55 -07:00
var key Info
2019-05-10 00:05:21 -07:00
var ok bool
2019-07-19 16:17:10 -07:00
2019-05-10 12:36:55 -07:00
if key , ok = obj . ( Info ) ; ! ok {
2019-08-09 13:41:56 -07:00
gen . queue . Forget ( obj )
2020-03-17 11:05:20 -07:00
logger . Info ( "Incorrect type; expected type 'info'" , "obj" , obj )
2019-05-10 00:05:21 -07:00
return nil
}
2019-08-09 13:41:56 -07:00
err := gen . syncHandler ( key )
gen . handleErr ( err , obj )
2019-05-10 00:05:21 -07:00
return nil
} ( obj )
if err != nil {
2020-03-17 11:05:20 -07:00
logger . Error ( err , "failed to process next work item" )
2019-07-19 16:17:10 -07:00
return true
2019-05-10 00:05:21 -07:00
}
return true
}
2019-08-09 13:41:56 -07:00
func ( gen * Generator ) syncHandler ( key Info ) error {
2020-03-17 11:05:20 -07:00
logger := gen . log
2019-06-26 12:41:42 -07:00
var robj runtime . Object
2019-05-10 00:05:21 -07:00
var err error
switch key . Kind {
2019-09-12 15:04:35 -07:00
case "ClusterPolicy" :
2021-06-30 00:43:11 +03:00
robj , err = gen . cpLister . Get ( key . Name )
if err != nil {
logger . Error ( err , "failed to get cluster policy" , "name" , key . Name )
return err
}
case "Policy" :
robj , err = gen . pLister . Policies ( key . Namespace ) . Get ( key . Name )
2019-05-10 00:05:21 -07:00
if err != nil {
2020-03-17 11:05:20 -07:00
logger . Error ( err , "failed to get policy" , "name" , key . Name )
2019-05-10 00:05:21 -07:00
return err
}
default :
2022-01-21 18:36:05 +08:00
robj , err = gen . client . GetResource ( "" , key . Kind , key . Namespace , key . Name )
2019-05-10 00:05:21 -07:00
if err != nil {
2020-11-03 16:07:02 -08:00
if ! errors . IsNotFound ( err ) {
logger . Error ( err , "failed to get resource" , "kind" , key . Kind , "name" , key . Name , "namespace" , key . Namespace )
}
2019-05-10 00:05:21 -07:00
return err
}
}
2019-07-08 16:53:34 -07:00
2019-12-26 11:50:41 -08:00
// set the event type based on reason
eventType := v1 . EventTypeWarning
2021-06-30 00:43:11 +03:00
if key . Reason == PolicyApplied . String ( ) {
eventType = v1 . EventTypeNormal
}
2019-05-10 00:05:21 -07:00
2019-12-26 11:50:41 -08:00
// based on the source of event generation, use different event recorders
switch key . Source {
case AdmissionController :
gen . admissionCtrRecorder . Event ( robj , eventType , key . Reason , key . Message )
case PolicyController :
gen . policyCtrRecorder . Event ( robj , eventType , key . Reason , key . Message )
case GeneratePolicyController :
gen . genPolicyRecorder . Event ( robj , eventType , key . Reason , key . Message )
default :
2020-03-17 11:05:20 -07:00
logger . Info ( "info.source not defined for the request" )
2019-05-10 00:05:21 -07:00
}
2019-12-26 11:50:41 -08:00
return nil
2019-05-10 00:05:21 -07:00
}
2019-08-26 13:34:42 -07:00
2019-12-26 11:50:41 -08:00
//NewEvent builds a event creation request
func NewEvent (
2020-03-17 11:05:20 -07:00
log logr . Logger ,
2019-08-26 13:34:42 -07:00
rkind ,
rapiVersion ,
rnamespace ,
rname ,
reason string ,
2019-12-26 11:50:41 -08:00
source Source ,
2019-08-26 13:34:42 -07:00
message MsgKey ,
args ... interface { } ) Info {
msgText , err := getEventMsg ( message , args ... )
if err != nil {
2020-03-17 11:05:20 -07:00
log . Error ( err , "failed to get event message" )
2019-08-26 13:34:42 -07:00
}
return Info {
Kind : rkind ,
Name : rname ,
Namespace : rnamespace ,
Reason : reason ,
2019-12-26 11:50:41 -08:00
Source : source ,
2019-08-26 13:34:42 -07:00
Message : msgText ,
}
}