mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-05 15:37:19 +00:00
refactor: events controller (#9236)
* refactor: make events controller shutdown graceful Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * nit Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * drain Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * refactor: events controller Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * exception Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * remove queue Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> --------- Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> Co-authored-by: shuting <shuting@nirmata.com>
This commit is contained in:
parent
b61a1f3d18
commit
b54e6230c5
10 changed files with 319 additions and 424 deletions
|
@ -138,11 +138,8 @@ func main() {
|
|||
}
|
||||
eventGenerator := event.NewEventGenerator(
|
||||
setup.KyvernoDynamicClient,
|
||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||
kyvernoInformer.Kyverno().V1().Policies(),
|
||||
maxQueuedEvents,
|
||||
emitEventsValues,
|
||||
logging.WithName("EventGenerator"),
|
||||
emitEventsValues...,
|
||||
)
|
||||
// this controller only subscribe to events, nothing is returned...
|
||||
var wg sync.WaitGroup
|
||||
|
@ -172,7 +169,7 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
// start event generator
|
||||
go eventGenerator.Run(signalCtx, 3, &wg)
|
||||
go eventGenerator.Run(signalCtx, event.Workers, &wg)
|
||||
// setup leader election
|
||||
le, err := leaderelection.New(
|
||||
setup.Logger.WithName("leader-election"),
|
||||
|
|
|
@ -132,11 +132,8 @@ func main() {
|
|||
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
|
||||
genericloggingcontroller.CheckGeneration,
|
||||
)
|
||||
eventGenerator := event.NewEventCleanupGenerator(
|
||||
eventGenerator := event.NewEventGenerator(
|
||||
setup.KyvernoDynamicClient,
|
||||
kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(),
|
||||
kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(),
|
||||
maxQueuedEvents,
|
||||
logging.WithName("EventGenerator"),
|
||||
)
|
||||
// start informers and wait for cache sync
|
||||
|
@ -145,7 +142,7 @@ func main() {
|
|||
}
|
||||
// start event generator
|
||||
var wg sync.WaitGroup
|
||||
go eventGenerator.Run(ctx, 3, &wg)
|
||||
go eventGenerator.Run(ctx, event.CleanupWorkers, &wg)
|
||||
// setup leader election
|
||||
le, err := leaderelection.New(
|
||||
setup.Logger.WithName("leader-election"),
|
||||
|
|
|
@ -322,11 +322,8 @@ func main() {
|
|||
}
|
||||
eventGenerator := event.NewEventGenerator(
|
||||
setup.KyvernoDynamicClient,
|
||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||
kyvernoInformer.Kyverno().V1().Policies(),
|
||||
maxQueuedEvents,
|
||||
omitEventsValues,
|
||||
logging.WithName("EventGenerator"),
|
||||
omitEventsValues...,
|
||||
)
|
||||
// this controller only subscribe to events, nothing is returned...
|
||||
policymetricscontroller.NewController(
|
||||
|
@ -393,7 +390,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
// start event generator
|
||||
go eventGenerator.Run(signalCtx, 3, &wg)
|
||||
go eventGenerator.Run(signalCtx, event.Workers, &wg)
|
||||
// setup leader election
|
||||
le, err := leaderelection.New(
|
||||
setup.Logger.WithName("leader-election"),
|
||||
|
|
|
@ -255,11 +255,8 @@ func main() {
|
|||
}
|
||||
eventGenerator := event.NewEventGenerator(
|
||||
setup.KyvernoDynamicClient,
|
||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||
kyvernoInformer.Kyverno().V1().Policies(),
|
||||
maxQueuedEvents,
|
||||
omitEventsValues,
|
||||
logging.WithName("EventGenerator"),
|
||||
omitEventsValues...,
|
||||
)
|
||||
// engine
|
||||
engine := internal.NewEngine(
|
||||
|
@ -283,7 +280,7 @@ func main() {
|
|||
}
|
||||
// start event generator
|
||||
var wg sync.WaitGroup
|
||||
go eventGenerator.Run(ctx, 3, &wg)
|
||||
go eventGenerator.Run(ctx, event.Workers, &wg)
|
||||
// setup leader election
|
||||
le, err := leaderelection.New(
|
||||
setup.Logger.WithName("leader-election"),
|
||||
|
|
|
@ -5,8 +5,8 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/kyverno/kyverno/pkg/config"
|
||||
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
@ -27,6 +27,25 @@ type fixture struct {
|
|||
client Interface
|
||||
}
|
||||
|
||||
func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": apiVersion,
|
||||
"kind": kind,
|
||||
"metadata": map[string]interface{}{
|
||||
"namespace": namespace,
|
||||
"name": name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newUnstructuredWithSpec(apiVersion, kind, namespace, name string, spec map[string]interface{}) *unstructured.Unstructured {
|
||||
u := newUnstructured(apiVersion, kind, namespace, name)
|
||||
u.Object["spec"] = spec
|
||||
return u
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
// init groupversion
|
||||
regResource := []schema.GroupVersionResource{
|
||||
|
@ -44,12 +63,12 @@ func newFixture(t *testing.T) *fixture {
|
|||
}
|
||||
|
||||
objects := []runtime.Object{
|
||||
kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
|
||||
kubeutils.NewUnstructured("group2/version", "TheKind", "ns-foo", "name2-foo"),
|
||||
kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-bar"),
|
||||
kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-baz"),
|
||||
kubeutils.NewUnstructured("group2/version", "TheKind", "ns-foo", "name2-baz"),
|
||||
kubeutils.NewUnstructured("apps/v1", "Deployment", config.KyvernoNamespace(), config.KyvernoDeploymentName()),
|
||||
newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
|
||||
newUnstructured("group2/version", "TheKind", "ns-foo", "name2-foo"),
|
||||
newUnstructured("group/version", "TheKind", "ns-foo", "name-bar"),
|
||||
newUnstructured("group/version", "TheKind", "ns-foo", "name-baz"),
|
||||
newUnstructured("group2/version", "TheKind", "ns-foo", "name2-baz"),
|
||||
newUnstructured("apps/v1", "Deployment", config.KyvernoNamespace(), config.KyvernoDeploymentName()),
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
|
@ -89,17 +108,17 @@ func TestCRUDResource(t *testing.T) {
|
|||
t.Errorf("DeleteResouce not working: %s", err)
|
||||
}
|
||||
// CreateResource
|
||||
_, err = f.client.CreateResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-foo1"), false)
|
||||
_, err = f.client.CreateResource(context.TODO(), "", "thekind", "ns-foo", newUnstructured("group/version", "TheKind", "ns-foo", "name-foo1"), false)
|
||||
if err != nil {
|
||||
t.Errorf("CreateResource not working: %s", err)
|
||||
}
|
||||
// UpdateResource
|
||||
_, err = f.client.UpdateResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "bar"}), false)
|
||||
_, err = f.client.UpdateResource(context.TODO(), "", "thekind", "ns-foo", newUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "bar"}), false)
|
||||
if err != nil {
|
||||
t.Errorf("UpdateResource not working: %s", err)
|
||||
}
|
||||
// UpdateStatusResource
|
||||
_, err = f.client.UpdateStatusResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "status"}), false)
|
||||
_, err = f.client.UpdateStatusResource(context.TODO(), "", "thekind", "ns-foo", newUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "status"}), false)
|
||||
if err != nil {
|
||||
t.Errorf("UpdateStatusResource not working: %s", err)
|
||||
}
|
||||
|
|
|
@ -3,58 +3,40 @@ package event
|
|||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
|
||||
kyvernov2beta1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v2beta1"
|
||||
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
|
||||
kyvernov2beta1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v2beta1"
|
||||
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
|
||||
"github.com/kyverno/kyverno/pkg/clients/dclient"
|
||||
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
errors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
Workers = 3
|
||||
CleanupWorkers = 3
|
||||
eventWorkQueueName = "kyverno-events"
|
||||
workQueueRetryLimit = 3
|
||||
)
|
||||
|
||||
// generator generate events
|
||||
type generator struct {
|
||||
client dclient.Interface
|
||||
// list/get cluster policy
|
||||
cpLister kyvernov1listers.ClusterPolicyLister
|
||||
// list/get policy
|
||||
pLister kyvernov1listers.PolicyLister
|
||||
// list/get cluster cleanup policy
|
||||
clustercleanuppolLister kyvernov2beta1listers.ClusterCleanupPolicyLister
|
||||
// list/get cleanup policy
|
||||
cleanuppolLister kyvernov2beta1listers.CleanupPolicyLister
|
||||
// queue to store event generation requests
|
||||
queue workqueue.RateLimitingInterface
|
||||
// events generated at policy controller
|
||||
policyCtrRecorder events.EventRecorder
|
||||
// events generated at admission control
|
||||
admissionCtrRecorder events.EventRecorder
|
||||
// events generated at namespaced policy controller to process 'generate' rule
|
||||
genPolicyRecorder events.EventRecorder
|
||||
// events generated at mutateExisting controller
|
||||
mutateExistingRecorder events.EventRecorder
|
||||
// events generated at cleanup controller
|
||||
cleanupPolicyRecorder events.EventRecorder
|
||||
// broadcaster
|
||||
broadcaster events.EventBroadcaster
|
||||
|
||||
maxQueuedEvents int
|
||||
// recorders
|
||||
recorders map[Source]events.EventRecorder
|
||||
|
||||
omitEvents []string
|
||||
// config
|
||||
omitEvents sets.Set[string]
|
||||
logger logr.Logger
|
||||
}
|
||||
|
||||
log logr.Logger
|
||||
// Interface to generate event
|
||||
type Interface interface {
|
||||
Add(infoList ...Info)
|
||||
}
|
||||
|
||||
// Controller interface to generate event
|
||||
|
@ -63,214 +45,84 @@ type Controller interface {
|
|||
Run(context.Context, int, *sync.WaitGroup)
|
||||
}
|
||||
|
||||
// Interface to generate event
|
||||
type Interface interface {
|
||||
Add(infoList ...Info)
|
||||
}
|
||||
|
||||
// NewEventGenerator to generate a new event controller
|
||||
func NewEventGenerator(
|
||||
// source Source,
|
||||
client dclient.Interface,
|
||||
cpInformer kyvernov1informers.ClusterPolicyInformer,
|
||||
pInformer kyvernov1informers.PolicyInformer,
|
||||
maxQueuedEvents int,
|
||||
omitEvents []string,
|
||||
log logr.Logger,
|
||||
) Controller {
|
||||
gen := generator{
|
||||
client: client,
|
||||
cpLister: cpInformer.Lister(),
|
||||
pLister: pInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName),
|
||||
policyCtrRecorder: NewRecorder(PolicyController, client.GetEventsInterface()),
|
||||
admissionCtrRecorder: NewRecorder(AdmissionController, client.GetEventsInterface()),
|
||||
genPolicyRecorder: NewRecorder(GeneratePolicyController, client.GetEventsInterface()),
|
||||
mutateExistingRecorder: NewRecorder(MutateExistingController, client.GetEventsInterface()),
|
||||
maxQueuedEvents: maxQueuedEvents,
|
||||
omitEvents: omitEvents,
|
||||
log: log,
|
||||
func NewEventGenerator(client dclient.Interface, logger logr.Logger, omitEvents ...string) Controller {
|
||||
return &generator{
|
||||
broadcaster: events.NewBroadcaster(&events.EventSinkImpl{
|
||||
Interface: client.GetEventsInterface(),
|
||||
}),
|
||||
omitEvents: sets.New(omitEvents...),
|
||||
logger: logger,
|
||||
}
|
||||
return &gen
|
||||
}
|
||||
|
||||
// NewEventGenerator to generate a new event cleanup controller
|
||||
func NewEventCleanupGenerator(
|
||||
// source Source,
|
||||
client dclient.Interface,
|
||||
clustercleanuppolInformer kyvernov2beta1informers.ClusterCleanupPolicyInformer,
|
||||
cleanuppolInformer kyvernov2beta1informers.CleanupPolicyInformer,
|
||||
maxQueuedEvents int,
|
||||
log logr.Logger,
|
||||
) Controller {
|
||||
gen := generator{
|
||||
client: client,
|
||||
clustercleanuppolLister: clustercleanuppolInformer.Lister(),
|
||||
cleanuppolLister: cleanuppolInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName),
|
||||
cleanupPolicyRecorder: NewRecorder(CleanupController, client.GetEventsInterface()),
|
||||
maxQueuedEvents: maxQueuedEvents,
|
||||
log: log,
|
||||
}
|
||||
return &gen
|
||||
}
|
||||
|
||||
// Add queues an event for generation
|
||||
func (gen *generator) Add(infos ...Info) {
|
||||
logger := gen.log
|
||||
logger := gen.logger
|
||||
logger.V(3).Info("generating events", "count", len(infos))
|
||||
if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents {
|
||||
logger.V(2).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len())
|
||||
return
|
||||
}
|
||||
for _, info := range infos {
|
||||
if info.Name == "" {
|
||||
// dont create event for resources with generateName
|
||||
// as the name is not generated yet
|
||||
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace)
|
||||
// don't create event for resources with generateName as the name is not generated yet
|
||||
if info.Regarding.Name == "" {
|
||||
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace)
|
||||
continue
|
||||
}
|
||||
|
||||
shouldEmitEvent := true
|
||||
for _, eventReason := range gen.omitEvents {
|
||||
if info.Reason == Reason(eventReason) {
|
||||
shouldEmitEvent = false
|
||||
logger.V(6).Info("omitting event", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace, "reason", info.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
if shouldEmitEvent {
|
||||
gen.queue.Add(info)
|
||||
logger.V(6).Info("creating event", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace, "reason", info.Reason)
|
||||
if gen.omitEvents.Has(string(info.Reason)) {
|
||||
logger.V(6).Info("omitting event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
|
||||
continue
|
||||
}
|
||||
gen.emitEvent(info)
|
||||
logger.V(6).Info("creating event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
// Run begins generator
|
||||
func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) {
|
||||
logger := gen.log
|
||||
logger := gen.logger
|
||||
logger.Info("start")
|
||||
defer logger.Info("shutting down")
|
||||
defer logger.Info("terminated")
|
||||
defer utilruntime.HandleCrash()
|
||||
defer gen.queue.ShutDown()
|
||||
for i := 0; i < workers; i++ {
|
||||
waitGroup.Add(1)
|
||||
go func() {
|
||||
defer waitGroup.Done()
|
||||
wait.UntilWithContext(ctx, gen.runWorker, time.Second)
|
||||
}()
|
||||
defer gen.stopRecorders()
|
||||
defer logger.Info("shutting down...")
|
||||
if err := gen.startRecorders(ctx); err != nil {
|
||||
logger.Error(err, "failed to start recorders")
|
||||
return
|
||||
}
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (gen *generator) runWorker(ctx context.Context) {
|
||||
for gen.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (gen *generator) handleErr(err error, key interface{}) {
|
||||
logger := gen.log
|
||||
if err == nil {
|
||||
gen.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
// This controller retries if something goes wrong. After that, it stops trying.
|
||||
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
|
||||
logger.V(4).Info("retrying event generation", "key", key, "reason", err.Error())
|
||||
// Re-enqueue the key rate limited. Based on the rate limiter on the
|
||||
// queue and the re-enqueue history, the key will be processed later again.
|
||||
gen.queue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
gen.queue.Forget(key)
|
||||
if !errors.IsNotFound(err) {
|
||||
logger.Error(err, "failed to generate event", "key", key)
|
||||
}
|
||||
}
|
||||
|
||||
func (gen *generator) processNextWorkItem() bool {
|
||||
obj, shutdown := gen.queue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
defer gen.queue.Done(obj)
|
||||
var key Info
|
||||
var ok bool
|
||||
if key, ok = obj.(Info); !ok {
|
||||
gen.queue.Forget(obj)
|
||||
gen.log.V(2).Info("Incorrect type; expected type 'info'", "obj", obj)
|
||||
return true
|
||||
}
|
||||
err := gen.syncHandler(key)
|
||||
gen.handleErr(err, obj)
|
||||
return true
|
||||
}
|
||||
|
||||
func (gen *generator) syncHandler(key Info) error {
|
||||
logger := gen.log
|
||||
var regardingObj, relatedObj runtime.Object
|
||||
var err error
|
||||
switch key.Kind {
|
||||
case "ClusterPolicy":
|
||||
regardingObj, err = gen.cpLister.Get(key.Name)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to get cluster policy", "name", key.Name)
|
||||
func (gen *generator) startRecorders(ctx context.Context) error {
|
||||
if err := gen.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
case "Policy":
|
||||
regardingObj, err = gen.pLister.Policies(key.Namespace).Get(key.Name)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to get policy", "name", key.Name)
|
||||
logger := klog.Background().V(int(0))
|
||||
// TODO: logger watcher should be stopped
|
||||
if _, err := gen.broadcaster.StartLogging(logger); err != nil {
|
||||
return err
|
||||
}
|
||||
case "ClusterCleanupPolicy":
|
||||
regardingObj, err = gen.clustercleanuppolLister.Get(key.Name)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to get cluster clean up policy", "name", key.Name)
|
||||
return err
|
||||
gen.recorders = map[Source]events.EventRecorder{
|
||||
PolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(PolicyController)),
|
||||
AdmissionController: gen.broadcaster.NewRecorder(scheme.Scheme, string(AdmissionController)),
|
||||
GeneratePolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(GeneratePolicyController)),
|
||||
MutateExistingController: gen.broadcaster.NewRecorder(scheme.Scheme, string(MutateExistingController)),
|
||||
CleanupController: gen.broadcaster.NewRecorder(scheme.Scheme, string(CleanupController)),
|
||||
}
|
||||
case "CleanupPolicy":
|
||||
regardingObj, err = gen.cleanuppolLister.CleanupPolicies(key.Namespace).Get(key.Name)
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to get cleanup policy", "name", key.Name)
|
||||
return err
|
||||
}
|
||||
default:
|
||||
regardingObj, err = gen.client.GetResource(context.TODO(), "", key.Kind, key.Namespace, key.Name)
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
logger.Error(err, "failed to get resource", "kind", key.Kind, "name", key.Name, "namespace", key.Namespace)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (gen *generator) stopRecorders() {
|
||||
gen.broadcaster.Shutdown()
|
||||
}
|
||||
|
||||
relatedObj = kubeutils.NewUnstructured(key.RelatedAPIVersion, key.RelatedKind, key.RelatedNamespace, key.RelatedName)
|
||||
|
||||
// set the event type based on reason
|
||||
// if skip/pass, reason will be: NORMAL
|
||||
// else reason will be: WARNING
|
||||
func (gen *generator) emitEvent(key Info) {
|
||||
logger := gen.logger
|
||||
eventType := corev1.EventTypeWarning
|
||||
if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
|
||||
eventType = corev1.EventTypeNormal
|
||||
}
|
||||
|
||||
if recorder := gen.recorders[key.Source]; recorder != nil {
|
||||
logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource())
|
||||
// based on the source of event generation, use different event recorders
|
||||
switch key.Source {
|
||||
case AdmissionController:
|
||||
gen.admissionCtrRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
case PolicyController:
|
||||
gen.policyCtrRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
case GeneratePolicyController:
|
||||
gen.genPolicyRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
case MutateExistingController:
|
||||
gen.mutateExistingRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
case CleanupController:
|
||||
gen.cleanupPolicyRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
default:
|
||||
recorder.Eventf(&key.Regarding, key.Related, eventType, string(key.Reason), string(key.Action), key.Message)
|
||||
} else {
|
||||
logger.Info("info.source not defined for the request")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,7 +7,9 @@ import (
|
|||
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
|
||||
kyvernov2alpha1 "github.com/kyverno/kyverno/api/kyverno/v2alpha1"
|
||||
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
func NewPolicyFailEvent(source Source, reason Reason, engineResponse engineapi.EngineResponse, ruleResp engineapi.RuleResponse, blocked bool) Info {
|
||||
|
@ -15,17 +17,25 @@ func NewPolicyFailEvent(source Source, reason Reason, engineResponse engineapi.E
|
|||
if blocked {
|
||||
action = ResourceBlocked
|
||||
}
|
||||
|
||||
pol := engineResponse.Policy()
|
||||
|
||||
return Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: pol.GetKind(),
|
||||
Name: pol.GetName(),
|
||||
Namespace: pol.GetNamespace(),
|
||||
RelatedAPIVersion: engineResponse.GetResourceSpec().APIVersion,
|
||||
RelatedKind: engineResponse.GetResourceSpec().Kind,
|
||||
RelatedName: engineResponse.GetResourceSpec().Name,
|
||||
RelatedNamespace: engineResponse.GetResourceSpec().Namespace,
|
||||
UID: pol.MetaObject().GetUID(),
|
||||
}
|
||||
related := engineResponse.GetResourceSpec()
|
||||
return Info{
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: related.APIVersion,
|
||||
Kind: related.Kind,
|
||||
Name: related.Name,
|
||||
Namespace: related.Namespace,
|
||||
UID: types.UID(related.UID),
|
||||
},
|
||||
Reason: reason,
|
||||
Source: source,
|
||||
Message: buildPolicyEventMessage(ruleResp, engineResponse.GetResourceSpec(), blocked),
|
||||
|
@ -83,15 +93,24 @@ func NewPolicyAppliedEvent(source Source, engineResponse engineapi.EngineRespons
|
|||
fmt.Fprintf(&bldr, "%s: pass", res)
|
||||
action = ResourcePassed
|
||||
}
|
||||
|
||||
return Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: policy.GetKind(),
|
||||
Name: policy.GetName(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
RelatedAPIVersion: resource.GetAPIVersion(),
|
||||
RelatedKind: resource.GetKind(),
|
||||
RelatedName: resource.GetName(),
|
||||
RelatedNamespace: resource.GetNamespace(),
|
||||
UID: policy.MetaObject().GetUID(),
|
||||
}
|
||||
related := engineResponse.GetResourceSpec()
|
||||
return Info{
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: related.APIVersion,
|
||||
Kind: related.Kind,
|
||||
Name: related.Name,
|
||||
Namespace: related.Namespace,
|
||||
UID: types.UID(related.UID),
|
||||
},
|
||||
Reason: PolicyApplied,
|
||||
Source: source,
|
||||
Message: bldr.String(),
|
||||
|
@ -107,11 +126,15 @@ func NewResourceViolationEvent(source Source, reason Reason, engineResponse engi
|
|||
fmt.Fprintf(&bldr, "policy %s/%s %s: %s", pol.GetName(),
|
||||
ruleResp.Name(), ruleResp.Status(), ruleResp.Message())
|
||||
resource := engineResponse.GetResourceSpec()
|
||||
|
||||
return Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
APIVersion: resource.APIVersion,
|
||||
Kind: resource.Kind,
|
||||
Name: resource.Name,
|
||||
Namespace: resource.Namespace,
|
||||
UID: types.UID(resource.UID),
|
||||
}
|
||||
return Info{
|
||||
Regarding: regarding,
|
||||
Reason: reason,
|
||||
Source: source,
|
||||
Message: bldr.String(),
|
||||
|
@ -121,11 +144,15 @@ func NewResourceViolationEvent(source Source, reason Reason, engineResponse engi
|
|||
|
||||
func NewResourceGenerationEvent(policy, rule string, source Source, resource kyvernov1.ResourceSpec) Info {
|
||||
msg := fmt.Sprintf("Created %s %s as a result of applying policy %s/%s", resource.GetKind(), resource.GetName(), policy, rule)
|
||||
|
||||
regarding := corev1.ObjectReference{
|
||||
APIVersion: resource.APIVersion,
|
||||
Kind: resource.Kind,
|
||||
Name: resource.Name,
|
||||
Namespace: resource.Namespace,
|
||||
UID: resource.UID,
|
||||
}
|
||||
return Info{
|
||||
Kind: resource.GetKind(),
|
||||
Namespace: resource.GetNamespace(),
|
||||
Name: resource.GetName(),
|
||||
Regarding: regarding,
|
||||
Source: source,
|
||||
Reason: PolicyApplied,
|
||||
Message: msg,
|
||||
|
@ -135,14 +162,23 @@ func NewResourceGenerationEvent(policy, rule string, source Source, resource kyv
|
|||
|
||||
func NewBackgroundFailedEvent(err error, policy kyvernov1.PolicyInterface, rule string, source Source, resource kyvernov1.ResourceSpec) []Info {
|
||||
var events []Info
|
||||
events = append(events, Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: resource.GetAPIVersion(),
|
||||
RelatedKind: resource.GetKind(),
|
||||
RelatedNamespace: resource.GetNamespace(),
|
||||
RelatedName: resource.GetName(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
UID: policy.GetUID(),
|
||||
}
|
||||
events = append(events, Info{
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: resource.APIVersion,
|
||||
Kind: resource.Kind,
|
||||
Name: resource.Name,
|
||||
Namespace: resource.Namespace,
|
||||
UID: resource.UID,
|
||||
},
|
||||
Source: source,
|
||||
Reason: PolicyError,
|
||||
Message: fmt.Sprintf("policy %s/%s error: %v", policy.GetName(), rule, err),
|
||||
|
@ -156,21 +192,28 @@ func NewBackgroundSuccessEvent(source Source, policy kyvernov1.PolicyInterface,
|
|||
var events []Info
|
||||
msg := "resource generated"
|
||||
action := ResourceGenerated
|
||||
|
||||
if source == MutateExistingController {
|
||||
msg = "resource mutated"
|
||||
action = ResourceMutated
|
||||
}
|
||||
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: policy.GetKind(),
|
||||
Name: policy.GetName(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
UID: policy.GetUID(),
|
||||
}
|
||||
for _, res := range resources {
|
||||
events = append(events, Info{
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: res.GetAPIVersion(),
|
||||
RelatedKind: res.GetKind(),
|
||||
RelatedNamespace: res.GetNamespace(),
|
||||
RelatedName: res.GetName(),
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: res.APIVersion,
|
||||
Kind: res.Kind,
|
||||
Name: res.Name,
|
||||
Namespace: res.Namespace,
|
||||
UID: res.UID,
|
||||
},
|
||||
Source: source,
|
||||
Reason: PolicyApplied,
|
||||
Message: msg,
|
||||
|
@ -192,27 +235,45 @@ func NewPolicyExceptionEvents(engineResponse engineapi.EngineResponse, ruleResp
|
|||
} else {
|
||||
exceptionMessage = fmt.Sprintf("resource %s was skipped from policy rule %s/%s/%s", resourceKey(engineResponse.PatchedResource), pol.GetNamespace(), pol.GetName(), ruleResp.Name())
|
||||
}
|
||||
policyEvent := Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: pol.GetKind(),
|
||||
Name: pol.GetName(),
|
||||
Namespace: pol.GetNamespace(),
|
||||
RelatedAPIVersion: engineResponse.PatchedResource.GetAPIVersion(),
|
||||
RelatedKind: engineResponse.PatchedResource.GetKind(),
|
||||
RelatedName: engineResponse.PatchedResource.GetName(),
|
||||
RelatedNamespace: engineResponse.PatchedResource.GetNamespace(),
|
||||
UID: pol.GetUID(),
|
||||
}
|
||||
related := engineResponse.GetResourceSpec()
|
||||
policyEvent := Info{
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: related.APIVersion,
|
||||
Kind: related.Kind,
|
||||
Name: related.Name,
|
||||
Namespace: related.Namespace,
|
||||
UID: types.UID(related.UID),
|
||||
},
|
||||
Reason: PolicySkipped,
|
||||
Message: policyMessage,
|
||||
Source: source,
|
||||
Action: ResourcePassed,
|
||||
}
|
||||
exceptionEvent := Info{
|
||||
Regarding: corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v2",
|
||||
Kind: "PolicyException",
|
||||
Name: exceptionName,
|
||||
Namespace: exceptionNamespace,
|
||||
RelatedAPIVersion: engineResponse.PatchedResource.GetAPIVersion(),
|
||||
RelatedKind: engineResponse.PatchedResource.GetKind(),
|
||||
RelatedName: engineResponse.PatchedResource.GetName(),
|
||||
RelatedNamespace: engineResponse.PatchedResource.GetNamespace(),
|
||||
UID: exception.GetUID(),
|
||||
},
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: related.APIVersion,
|
||||
Kind: related.Kind,
|
||||
Name: related.Name,
|
||||
Namespace: related.Namespace,
|
||||
UID: types.UID(related.UID),
|
||||
},
|
||||
Reason: PolicySkipped,
|
||||
Message: exceptionMessage,
|
||||
Source: source,
|
||||
|
@ -222,15 +283,24 @@ func NewPolicyExceptionEvents(engineResponse engineapi.EngineResponse, ruleResp
|
|||
}
|
||||
|
||||
func NewCleanupPolicyEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resource unstructured.Unstructured, err error) Info {
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v2beta1",
|
||||
Kind: policy.GetKind(),
|
||||
Name: policy.GetName(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
UID: policy.GetUID(),
|
||||
}
|
||||
related := &corev1.ObjectReference{
|
||||
APIVersion: resource.GetAPIVersion(),
|
||||
Kind: resource.GetKind(),
|
||||
Namespace: resource.GetNamespace(),
|
||||
Name: resource.GetName(),
|
||||
}
|
||||
if err == nil {
|
||||
return Info{
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: resource.GetAPIVersion(),
|
||||
RelatedKind: resource.GetKind(),
|
||||
RelatedNamespace: resource.GetNamespace(),
|
||||
RelatedName: resource.GetName(),
|
||||
Regarding: regarding,
|
||||
Related: related,
|
||||
Source: CleanupController,
|
||||
Action: ResourceCleanedUp,
|
||||
Reason: PolicyApplied,
|
||||
|
@ -238,13 +308,8 @@ func NewCleanupPolicyEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resour
|
|||
}
|
||||
} else {
|
||||
return Info{
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: resource.GetAPIVersion(),
|
||||
RelatedKind: resource.GetKind(),
|
||||
RelatedNamespace: resource.GetNamespace(),
|
||||
RelatedName: resource.GetName(),
|
||||
Regarding: regarding,
|
||||
Related: related,
|
||||
Source: CleanupController,
|
||||
Action: None,
|
||||
Reason: PolicyError,
|
||||
|
@ -254,25 +319,33 @@ func NewCleanupPolicyEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resour
|
|||
}
|
||||
|
||||
func NewValidatingAdmissionPolicyEvent(policy kyvernov1.PolicyInterface, vapName, vapBindingName string) []Info {
|
||||
vapEvent := Info{
|
||||
regarding := corev1.ObjectReference{
|
||||
// TODO: iirc it's not safe to assume api version is set
|
||||
APIVersion: "kyverno.io/v1",
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: "admissionregistration.k8s.io/v1alpha1",
|
||||
RelatedKind: "ValidatingAdmissionPolicy",
|
||||
RelatedName: vapName,
|
||||
Namespace: policy.GetNamespace(),
|
||||
UID: policy.GetUID(),
|
||||
}
|
||||
vapEvent := Info{
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: "admissionregistration.k8s.io/v1alpha1",
|
||||
Kind: "ValidatingAdmissionPolicy",
|
||||
Name: vapName,
|
||||
},
|
||||
Source: GeneratePolicyController,
|
||||
Action: ResourceGenerated,
|
||||
Reason: PolicyApplied,
|
||||
Message: fmt.Sprintf("successfully generated validating admission policy %s from policy %s", vapName, policy.GetName()),
|
||||
}
|
||||
vapBindingEvent := Info{
|
||||
Kind: policy.GetKind(),
|
||||
Namespace: policy.GetNamespace(),
|
||||
Name: policy.GetName(),
|
||||
RelatedAPIVersion: "admissionregistration.k8s.io/v1alpha1",
|
||||
RelatedKind: "ValidatingAdmissionPolicyBinding",
|
||||
RelatedName: vapBindingName,
|
||||
Regarding: regarding,
|
||||
Related: &corev1.ObjectReference{
|
||||
APIVersion: "admissionregistration.k8s.io/v1alpha1",
|
||||
Kind: "ValidatingAdmissionPolicyBinding",
|
||||
Name: vapBindingName,
|
||||
},
|
||||
Source: GeneratePolicyController,
|
||||
Action: ResourceGenerated,
|
||||
Reason: PolicyApplied,
|
||||
|
@ -283,9 +356,13 @@ func NewValidatingAdmissionPolicyEvent(policy kyvernov1.PolicyInterface, vapName
|
|||
|
||||
func NewFailedEvent(err error, policy, rule string, source Source, resource kyvernov1.ResourceSpec) Info {
|
||||
return Info{
|
||||
Kind: resource.GetKind(),
|
||||
Namespace: resource.GetNamespace(),
|
||||
Name: resource.GetName(),
|
||||
Regarding: corev1.ObjectReference{
|
||||
APIVersion: resource.APIVersion,
|
||||
Kind: resource.Kind,
|
||||
Name: resource.Name,
|
||||
Namespace: resource.Namespace,
|
||||
UID: resource.UID,
|
||||
},
|
||||
Source: source,
|
||||
Reason: PolicyError,
|
||||
Message: fmt.Sprintf("policy %s/%s error: %v", policy, rule, err),
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
package event
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"strings"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// Info defines the event details
|
||||
type Info struct {
|
||||
Kind string
|
||||
Name string
|
||||
Namespace string
|
||||
RelatedAPIVersion string
|
||||
RelatedKind string
|
||||
RelatedName string
|
||||
RelatedNamespace string
|
||||
Regarding corev1.ObjectReference
|
||||
Related *corev1.ObjectReference
|
||||
Reason Reason
|
||||
Message string
|
||||
Action Action
|
||||
|
@ -18,8 +17,8 @@ type Info struct {
|
|||
}
|
||||
|
||||
func (i *Info) Resource() string {
|
||||
if i.Namespace == "" {
|
||||
return strings.Join([]string{i.Kind, i.Name}, "/")
|
||||
if i.Regarding.Namespace == "" {
|
||||
return strings.Join([]string{i.Regarding.Kind, i.Regarding.Name}, "/")
|
||||
}
|
||||
return strings.Join([]string{i.Kind, i.Namespace, i.Name}, "/")
|
||||
return strings.Join([]string{i.Regarding.Kind, i.Regarding.Namespace, i.Regarding.Name}, "/")
|
||||
}
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
typedeventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/client-go/tools/events"
|
||||
)
|
||||
|
||||
func NewRecorder(source Source, sink typedeventsv1.EventsV1Interface) events.EventRecorder {
|
||||
utilruntime.Must(scheme.AddToScheme(scheme.Scheme))
|
||||
eventBroadcaster := events.NewBroadcaster(
|
||||
&events.EventSinkImpl{
|
||||
Interface: sink,
|
||||
},
|
||||
)
|
||||
eventBroadcaster.StartStructuredLogging(0)
|
||||
stopCh := make(chan struct{})
|
||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
return eventBroadcaster.NewRecorder(scheme.Scheme, string(source))
|
||||
}
|
|
@ -28,22 +28,3 @@ func ObjToUnstructured(obj interface{}) (*unstructured.Unstructured, error) {
|
|||
}
|
||||
return &unstructured.Unstructured{Object: unstrObj}, nil
|
||||
}
|
||||
|
||||
func NewUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{
|
||||
Object: map[string]interface{}{
|
||||
"apiVersion": apiVersion,
|
||||
"kind": kind,
|
||||
"metadata": map[string]interface{}{
|
||||
"namespace": namespace,
|
||||
"name": name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewUnstructuredWithSpec(apiVersion, kind, namespace, name string, spec map[string]interface{}) *unstructured.Unstructured {
|
||||
u := NewUnstructured(apiVersion, kind, namespace, name)
|
||||
u.Object["spec"] = spec
|
||||
return u
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue