From b54e6230c5b2106ef380cfe7d65b575a2f508625 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Fri, 22 Dec 2023 11:47:22 +0100 Subject: [PATCH] refactor: events controller (#9236) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: make events controller shutdown graceful Signed-off-by: Charles-Edouard Brétéché * nit Signed-off-by: Charles-Edouard Brétéché * drain Signed-off-by: Charles-Edouard Brétéché * refactor: events controller Signed-off-by: Charles-Edouard Brétéché * exception Signed-off-by: Charles-Edouard Brétéché * remove queue Signed-off-by: Charles-Edouard Brétéché --------- Signed-off-by: Charles-Edouard Brétéché Co-authored-by: shuting --- cmd/background-controller/main.go | 7 +- cmd/cleanup-controller/main.go | 7 +- cmd/kyverno/main.go | 7 +- cmd/reports-controller/main.go | 7 +- pkg/clients/dclient/client_test.go | 39 +++- pkg/event/controller.go | 274 ++++++------------------ pkg/event/events.go | 333 ++++++++++++++++++----------- pkg/event/info.go | 29 ++- pkg/event/recorder.go | 21 -- pkg/utils/kube/unstructured.go | 19 -- 10 files changed, 319 insertions(+), 424 deletions(-) delete mode 100644 pkg/event/recorder.go diff --git a/cmd/background-controller/main.go b/cmd/background-controller/main.go index ca86556a5c..c973299b6b 100644 --- a/cmd/background-controller/main.go +++ b/cmd/background-controller/main.go @@ -138,11 +138,8 @@ func main() { } eventGenerator := event.NewEventGenerator( setup.KyvernoDynamicClient, - kyvernoInformer.Kyverno().V1().ClusterPolicies(), - kyvernoInformer.Kyverno().V1().Policies(), - maxQueuedEvents, - emitEventsValues, logging.WithName("EventGenerator"), + emitEventsValues..., ) // this controller only subscribe to events, nothing is returned... var wg sync.WaitGroup @@ -172,7 +169,7 @@ func main() { os.Exit(1) } // start event generator - go eventGenerator.Run(signalCtx, 3, &wg) + go eventGenerator.Run(signalCtx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), diff --git a/cmd/cleanup-controller/main.go b/cmd/cleanup-controller/main.go index 1d2ad55774..20668c1601 100644 --- a/cmd/cleanup-controller/main.go +++ b/cmd/cleanup-controller/main.go @@ -132,11 +132,8 @@ func main() { kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(), genericloggingcontroller.CheckGeneration, ) - eventGenerator := event.NewEventCleanupGenerator( + eventGenerator := event.NewEventGenerator( setup.KyvernoDynamicClient, - kyvernoInformer.Kyverno().V2beta1().ClusterCleanupPolicies(), - kyvernoInformer.Kyverno().V2beta1().CleanupPolicies(), - maxQueuedEvents, logging.WithName("EventGenerator"), ) // start informers and wait for cache sync @@ -145,7 +142,7 @@ func main() { } // start event generator var wg sync.WaitGroup - go eventGenerator.Run(ctx, 3, &wg) + go eventGenerator.Run(ctx, event.CleanupWorkers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 6deb36eb6e..a30ce205f4 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -322,11 +322,8 @@ func main() { } eventGenerator := event.NewEventGenerator( setup.KyvernoDynamicClient, - kyvernoInformer.Kyverno().V1().ClusterPolicies(), - kyvernoInformer.Kyverno().V1().Policies(), - maxQueuedEvents, - omitEventsValues, logging.WithName("EventGenerator"), + omitEventsValues..., ) // this controller only subscribe to events, nothing is returned... policymetricscontroller.NewController( @@ -393,7 +390,7 @@ func main() { } } // start event generator - go eventGenerator.Run(signalCtx, 3, &wg) + go eventGenerator.Run(signalCtx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), diff --git a/cmd/reports-controller/main.go b/cmd/reports-controller/main.go index 6b3ca7c7d9..5f75a1c9b8 100644 --- a/cmd/reports-controller/main.go +++ b/cmd/reports-controller/main.go @@ -255,11 +255,8 @@ func main() { } eventGenerator := event.NewEventGenerator( setup.KyvernoDynamicClient, - kyvernoInformer.Kyverno().V1().ClusterPolicies(), - kyvernoInformer.Kyverno().V1().Policies(), - maxQueuedEvents, - omitEventsValues, logging.WithName("EventGenerator"), + omitEventsValues..., ) // engine engine := internal.NewEngine( @@ -283,7 +280,7 @@ func main() { } // start event generator var wg sync.WaitGroup - go eventGenerator.Run(ctx, 3, &wg) + go eventGenerator.Run(ctx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), diff --git a/pkg/clients/dclient/client_test.go b/pkg/clients/dclient/client_test.go index 6d4eb41188..e988cfc734 100644 --- a/pkg/clients/dclient/client_test.go +++ b/pkg/clients/dclient/client_test.go @@ -5,8 +5,8 @@ import ( "testing" "github.com/kyverno/kyverno/pkg/config" - kubeutils "github.com/kyverno/kyverno/pkg/utils/kube" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -27,6 +27,25 @@ type fixture struct { client Interface } +func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": name, + }, + }, + } +} + +func newUnstructuredWithSpec(apiVersion, kind, namespace, name string, spec map[string]interface{}) *unstructured.Unstructured { + u := newUnstructured(apiVersion, kind, namespace, name) + u.Object["spec"] = spec + return u +} + func newFixture(t *testing.T) *fixture { // init groupversion regResource := []schema.GroupVersionResource{ @@ -44,12 +63,12 @@ func newFixture(t *testing.T) *fixture { } objects := []runtime.Object{ - kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), - kubeutils.NewUnstructured("group2/version", "TheKind", "ns-foo", "name2-foo"), - kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-bar"), - kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-baz"), - kubeutils.NewUnstructured("group2/version", "TheKind", "ns-foo", "name2-baz"), - kubeutils.NewUnstructured("apps/v1", "Deployment", config.KyvernoNamespace(), config.KyvernoDeploymentName()), + newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), + newUnstructured("group2/version", "TheKind", "ns-foo", "name2-foo"), + newUnstructured("group/version", "TheKind", "ns-foo", "name-bar"), + newUnstructured("group/version", "TheKind", "ns-foo", "name-baz"), + newUnstructured("group2/version", "TheKind", "ns-foo", "name2-baz"), + newUnstructured("apps/v1", "Deployment", config.KyvernoNamespace(), config.KyvernoDeploymentName()), } scheme := runtime.NewScheme() @@ -89,17 +108,17 @@ func TestCRUDResource(t *testing.T) { t.Errorf("DeleteResouce not working: %s", err) } // CreateResource - _, err = f.client.CreateResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructured("group/version", "TheKind", "ns-foo", "name-foo1"), false) + _, err = f.client.CreateResource(context.TODO(), "", "thekind", "ns-foo", newUnstructured("group/version", "TheKind", "ns-foo", "name-foo1"), false) if err != nil { t.Errorf("CreateResource not working: %s", err) } // UpdateResource - _, err = f.client.UpdateResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "bar"}), false) + _, err = f.client.UpdateResource(context.TODO(), "", "thekind", "ns-foo", newUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "bar"}), false) if err != nil { t.Errorf("UpdateResource not working: %s", err) } // UpdateStatusResource - _, err = f.client.UpdateStatusResource(context.TODO(), "", "thekind", "ns-foo", kubeutils.NewUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "status"}), false) + _, err = f.client.UpdateStatusResource(context.TODO(), "", "thekind", "ns-foo", newUnstructuredWithSpec("group/version", "TheKind", "ns-foo", "name-foo1", map[string]interface{}{"foo": "status"}), false) if err != nil { t.Errorf("UpdateStatusResource not working: %s", err) } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index c29eac9330..de7ac0e156 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -3,58 +3,40 @@ package event import ( "context" "sync" - "time" "github.com/go-logr/logr" - kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1" - kyvernov2beta1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v2beta1" - kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1" - kyvernov2beta1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v2beta1" + "github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme" "github.com/kyverno/kyverno/pkg/clients/dclient" - kubeutils "github.com/kyverno/kyverno/pkg/utils/kube" corev1 "k8s.io/api/core/v1" - errors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/events" - "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" ) const ( + Workers = 3 + CleanupWorkers = 3 eventWorkQueueName = "kyverno-events" workQueueRetryLimit = 3 ) // generator generate events type generator struct { - client dclient.Interface - // list/get cluster policy - cpLister kyvernov1listers.ClusterPolicyLister - // list/get policy - pLister kyvernov1listers.PolicyLister - // list/get cluster cleanup policy - clustercleanuppolLister kyvernov2beta1listers.ClusterCleanupPolicyLister - // list/get cleanup policy - cleanuppolLister kyvernov2beta1listers.CleanupPolicyLister - // queue to store event generation requests - queue workqueue.RateLimitingInterface - // events generated at policy controller - policyCtrRecorder events.EventRecorder - // events generated at admission control - admissionCtrRecorder events.EventRecorder - // events generated at namespaced policy controller to process 'generate' rule - genPolicyRecorder events.EventRecorder - // events generated at mutateExisting controller - mutateExistingRecorder events.EventRecorder - // events generated at cleanup controller - cleanupPolicyRecorder events.EventRecorder + // broadcaster + broadcaster events.EventBroadcaster - maxQueuedEvents int + // recorders + recorders map[Source]events.EventRecorder - omitEvents []string + // config + omitEvents sets.Set[string] + logger logr.Logger +} - log logr.Logger +// Interface to generate event +type Interface interface { + Add(infoList ...Info) } // Controller interface to generate event @@ -63,214 +45,84 @@ type Controller interface { Run(context.Context, int, *sync.WaitGroup) } -// Interface to generate event -type Interface interface { - Add(infoList ...Info) -} - // NewEventGenerator to generate a new event controller -func NewEventGenerator( - // source Source, - client dclient.Interface, - cpInformer kyvernov1informers.ClusterPolicyInformer, - pInformer kyvernov1informers.PolicyInformer, - maxQueuedEvents int, - omitEvents []string, - log logr.Logger, -) Controller { - gen := generator{ - client: client, - cpLister: cpInformer.Lister(), - pLister: pInformer.Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName), - policyCtrRecorder: NewRecorder(PolicyController, client.GetEventsInterface()), - admissionCtrRecorder: NewRecorder(AdmissionController, client.GetEventsInterface()), - genPolicyRecorder: NewRecorder(GeneratePolicyController, client.GetEventsInterface()), - mutateExistingRecorder: NewRecorder(MutateExistingController, client.GetEventsInterface()), - maxQueuedEvents: maxQueuedEvents, - omitEvents: omitEvents, - log: log, +func NewEventGenerator(client dclient.Interface, logger logr.Logger, omitEvents ...string) Controller { + return &generator{ + broadcaster: events.NewBroadcaster(&events.EventSinkImpl{ + Interface: client.GetEventsInterface(), + }), + omitEvents: sets.New(omitEvents...), + logger: logger, } - return &gen -} - -// NewEventGenerator to generate a new event cleanup controller -func NewEventCleanupGenerator( - // source Source, - client dclient.Interface, - clustercleanuppolInformer kyvernov2beta1informers.ClusterCleanupPolicyInformer, - cleanuppolInformer kyvernov2beta1informers.CleanupPolicyInformer, - maxQueuedEvents int, - log logr.Logger, -) Controller { - gen := generator{ - client: client, - clustercleanuppolLister: clustercleanuppolInformer.Lister(), - cleanuppolLister: cleanuppolInformer.Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName), - cleanupPolicyRecorder: NewRecorder(CleanupController, client.GetEventsInterface()), - maxQueuedEvents: maxQueuedEvents, - log: log, - } - return &gen } // Add queues an event for generation func (gen *generator) Add(infos ...Info) { - logger := gen.log + logger := gen.logger logger.V(3).Info("generating events", "count", len(infos)) - if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents { - logger.V(2).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len()) - return - } for _, info := range infos { - if info.Name == "" { - // dont create event for resources with generateName - // as the name is not generated yet - logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace) + // don't create event for resources with generateName as the name is not generated yet + if info.Regarding.Name == "" { + logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace) continue } - - shouldEmitEvent := true - for _, eventReason := range gen.omitEvents { - if info.Reason == Reason(eventReason) { - shouldEmitEvent = false - logger.V(6).Info("omitting event", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace, "reason", info.Reason) - } - } - - if shouldEmitEvent { - gen.queue.Add(info) - logger.V(6).Info("creating event", "kind", info.Kind, "name", info.Name, "namespace", info.Namespace, "reason", info.Reason) + if gen.omitEvents.Has(string(info.Reason)) { + logger.V(6).Info("omitting event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason) + continue } + gen.emitEvent(info) + logger.V(6).Info("creating event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason) } } // Run begins generator func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) { - logger := gen.log + logger := gen.logger logger.Info("start") - defer logger.Info("shutting down") + defer logger.Info("terminated") defer utilruntime.HandleCrash() - defer gen.queue.ShutDown() - for i := 0; i < workers; i++ { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - wait.UntilWithContext(ctx, gen.runWorker, time.Second) - }() + defer gen.stopRecorders() + defer logger.Info("shutting down...") + if err := gen.startRecorders(ctx); err != nil { + logger.Error(err, "failed to start recorders") + return } <-ctx.Done() } -func (gen *generator) runWorker(ctx context.Context) { - for gen.processNextWorkItem() { +func (gen *generator) startRecorders(ctx context.Context) error { + if err := gen.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { + return err } + logger := klog.Background().V(int(0)) + // TODO: logger watcher should be stopped + if _, err := gen.broadcaster.StartLogging(logger); err != nil { + return err + } + gen.recorders = map[Source]events.EventRecorder{ + PolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(PolicyController)), + AdmissionController: gen.broadcaster.NewRecorder(scheme.Scheme, string(AdmissionController)), + GeneratePolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(GeneratePolicyController)), + MutateExistingController: gen.broadcaster.NewRecorder(scheme.Scheme, string(MutateExistingController)), + CleanupController: gen.broadcaster.NewRecorder(scheme.Scheme, string(CleanupController)), + } + return nil } -func (gen *generator) handleErr(err error, key interface{}) { - logger := gen.log - if err == nil { - gen.queue.Forget(key) - return - } - // This controller retries if something goes wrong. After that, it stops trying. - if gen.queue.NumRequeues(key) < workQueueRetryLimit { - logger.V(4).Info("retrying event generation", "key", key, "reason", err.Error()) - // Re-enqueue the key rate limited. Based on the rate limiter on the - // queue and the re-enqueue history, the key will be processed later again. - gen.queue.AddRateLimited(key) - return - } - gen.queue.Forget(key) - if !errors.IsNotFound(err) { - logger.Error(err, "failed to generate event", "key", key) - } +func (gen *generator) stopRecorders() { + gen.broadcaster.Shutdown() } -func (gen *generator) processNextWorkItem() bool { - obj, shutdown := gen.queue.Get() - if shutdown { - return false - } - defer gen.queue.Done(obj) - var key Info - var ok bool - if key, ok = obj.(Info); !ok { - gen.queue.Forget(obj) - gen.log.V(2).Info("Incorrect type; expected type 'info'", "obj", obj) - return true - } - err := gen.syncHandler(key) - gen.handleErr(err, obj) - return true -} - -func (gen *generator) syncHandler(key Info) error { - logger := gen.log - var regardingObj, relatedObj runtime.Object - var err error - switch key.Kind { - case "ClusterPolicy": - regardingObj, err = gen.cpLister.Get(key.Name) - if err != nil { - logger.Error(err, "failed to get cluster policy", "name", key.Name) - return err - } - case "Policy": - regardingObj, err = gen.pLister.Policies(key.Namespace).Get(key.Name) - if err != nil { - logger.Error(err, "failed to get policy", "name", key.Name) - return err - } - case "ClusterCleanupPolicy": - regardingObj, err = gen.clustercleanuppolLister.Get(key.Name) - if err != nil { - logger.Error(err, "failed to get cluster clean up policy", "name", key.Name) - return err - } - case "CleanupPolicy": - regardingObj, err = gen.cleanuppolLister.CleanupPolicies(key.Namespace).Get(key.Name) - if err != nil { - logger.Error(err, "failed to get cleanup policy", "name", key.Name) - return err - } - default: - regardingObj, err = gen.client.GetResource(context.TODO(), "", key.Kind, key.Namespace, key.Name) - if err != nil { - if !errors.IsNotFound(err) { - logger.Error(err, "failed to get resource", "kind", key.Kind, "name", key.Name, "namespace", key.Namespace) - return nil - } - return err - } - } - - relatedObj = kubeutils.NewUnstructured(key.RelatedAPIVersion, key.RelatedKind, key.RelatedNamespace, key.RelatedName) - - // set the event type based on reason - // if skip/pass, reason will be: NORMAL - // else reason will be: WARNING +func (gen *generator) emitEvent(key Info) { + logger := gen.logger eventType := corev1.EventTypeWarning if key.Reason == PolicyApplied || key.Reason == PolicySkipped { eventType = corev1.EventTypeNormal } - - logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource()) - // based on the source of event generation, use different event recorders - switch key.Source { - case AdmissionController: - gen.admissionCtrRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message) - case PolicyController: - gen.policyCtrRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message) - case GeneratePolicyController: - gen.genPolicyRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message) - case MutateExistingController: - gen.mutateExistingRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message) - case CleanupController: - gen.cleanupPolicyRecorder.Eventf(regardingObj, relatedObj, eventType, string(key.Reason), string(key.Action), key.Message) - default: + if recorder := gen.recorders[key.Source]; recorder != nil { + logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource()) + recorder.Eventf(&key.Regarding, key.Related, eventType, string(key.Reason), string(key.Action), key.Message) + } else { logger.Info("info.source not defined for the request") } - return nil } diff --git a/pkg/event/events.go b/pkg/event/events.go index 08fc563305..13d34268d5 100644 --- a/pkg/event/events.go +++ b/pkg/event/events.go @@ -7,7 +7,9 @@ import ( kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" kyvernov2alpha1 "github.com/kyverno/kyverno/api/kyverno/v2alpha1" engineapi "github.com/kyverno/kyverno/pkg/engine/api" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" ) func NewPolicyFailEvent(source Source, reason Reason, engineResponse engineapi.EngineResponse, ruleResp engineapi.RuleResponse, blocked bool) Info { @@ -15,21 +17,29 @@ func NewPolicyFailEvent(source Source, reason Reason, engineResponse engineapi.E if blocked { action = ResourceBlocked } - pol := engineResponse.Policy() - + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: pol.GetKind(), + Name: pol.GetName(), + Namespace: pol.GetNamespace(), + UID: pol.MetaObject().GetUID(), + } + related := engineResponse.GetResourceSpec() return Info{ - Kind: pol.GetKind(), - Name: pol.GetName(), - Namespace: pol.GetNamespace(), - RelatedAPIVersion: engineResponse.GetResourceSpec().APIVersion, - RelatedKind: engineResponse.GetResourceSpec().Kind, - RelatedName: engineResponse.GetResourceSpec().Name, - RelatedNamespace: engineResponse.GetResourceSpec().Namespace, - Reason: reason, - Source: source, - Message: buildPolicyEventMessage(ruleResp, engineResponse.GetResourceSpec(), blocked), - Action: action, + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: related.APIVersion, + Kind: related.Kind, + Name: related.Name, + Namespace: related.Namespace, + UID: types.UID(related.UID), + }, + Reason: reason, + Source: source, + Message: buildPolicyEventMessage(ruleResp, engineResponse.GetResourceSpec(), blocked), + Action: action, } } @@ -83,19 +93,28 @@ func NewPolicyAppliedEvent(source Source, engineResponse engineapi.EngineRespons fmt.Fprintf(&bldr, "%s: pass", res) action = ResourcePassed } - + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: policy.GetKind(), + Name: policy.GetName(), + Namespace: policy.GetNamespace(), + UID: policy.MetaObject().GetUID(), + } + related := engineResponse.GetResourceSpec() return Info{ - Kind: policy.GetKind(), - Name: policy.GetName(), - Namespace: policy.GetNamespace(), - RelatedAPIVersion: resource.GetAPIVersion(), - RelatedKind: resource.GetKind(), - RelatedName: resource.GetName(), - RelatedNamespace: resource.GetNamespace(), - Reason: PolicyApplied, - Source: source, - Message: bldr.String(), - Action: action, + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: related.APIVersion, + Kind: related.Kind, + Name: related.Name, + Namespace: related.Namespace, + UID: types.UID(related.UID), + }, + Reason: PolicyApplied, + Source: source, + Message: bldr.String(), + Action: action, } } @@ -107,11 +126,15 @@ func NewResourceViolationEvent(source Source, reason Reason, engineResponse engi fmt.Fprintf(&bldr, "policy %s/%s %s: %s", pol.GetName(), ruleResp.Name(), ruleResp.Status(), ruleResp.Message()) resource := engineResponse.GetResourceSpec() - + regarding := corev1.ObjectReference{ + APIVersion: resource.APIVersion, + Kind: resource.Kind, + Name: resource.Name, + Namespace: resource.Namespace, + UID: types.UID(resource.UID), + } return Info{ - Kind: resource.Kind, - Name: resource.Name, - Namespace: resource.Namespace, + Regarding: regarding, Reason: reason, Source: source, Message: bldr.String(), @@ -121,11 +144,15 @@ func NewResourceViolationEvent(source Source, reason Reason, engineResponse engi func NewResourceGenerationEvent(policy, rule string, source Source, resource kyvernov1.ResourceSpec) Info { msg := fmt.Sprintf("Created %s %s as a result of applying policy %s/%s", resource.GetKind(), resource.GetName(), policy, rule) - + regarding := corev1.ObjectReference{ + APIVersion: resource.APIVersion, + Kind: resource.Kind, + Name: resource.Name, + Namespace: resource.Namespace, + UID: resource.UID, + } return Info{ - Kind: resource.GetKind(), - Namespace: resource.GetNamespace(), - Name: resource.GetName(), + Regarding: regarding, Source: source, Reason: PolicyApplied, Message: msg, @@ -135,18 +162,27 @@ func NewResourceGenerationEvent(policy, rule string, source Source, resource kyv func NewBackgroundFailedEvent(err error, policy kyvernov1.PolicyInterface, rule string, source Source, resource kyvernov1.ResourceSpec) []Info { var events []Info + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: policy.GetKind(), + Name: policy.GetName(), + Namespace: policy.GetNamespace(), + UID: policy.GetUID(), + } events = append(events, Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: resource.GetAPIVersion(), - RelatedKind: resource.GetKind(), - RelatedNamespace: resource.GetNamespace(), - RelatedName: resource.GetName(), - Source: source, - Reason: PolicyError, - Message: fmt.Sprintf("policy %s/%s error: %v", policy.GetName(), rule, err), - Action: None, + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: resource.APIVersion, + Kind: resource.Kind, + Name: resource.Name, + Namespace: resource.Namespace, + UID: resource.UID, + }, + Source: source, + Reason: PolicyError, + Message: fmt.Sprintf("policy %s/%s error: %v", policy.GetName(), rule, err), + Action: None, }) return events @@ -156,25 +192,32 @@ func NewBackgroundSuccessEvent(source Source, policy kyvernov1.PolicyInterface, var events []Info msg := "resource generated" action := ResourceGenerated - if source == MutateExistingController { msg = "resource mutated" action = ResourceMutated } - + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: policy.GetKind(), + Name: policy.GetName(), + Namespace: policy.GetNamespace(), + UID: policy.GetUID(), + } for _, res := range resources { events = append(events, Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: res.GetAPIVersion(), - RelatedKind: res.GetKind(), - RelatedNamespace: res.GetNamespace(), - RelatedName: res.GetName(), - Source: source, - Reason: PolicyApplied, - Message: msg, - Action: action, + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: res.APIVersion, + Kind: res.Kind, + Name: res.Name, + Namespace: res.Namespace, + UID: res.UID, + }, + Source: source, + Reason: PolicyApplied, + Message: msg, + Action: action, }) } @@ -192,104 +235,138 @@ func NewPolicyExceptionEvents(engineResponse engineapi.EngineResponse, ruleResp } else { exceptionMessage = fmt.Sprintf("resource %s was skipped from policy rule %s/%s/%s", resourceKey(engineResponse.PatchedResource), pol.GetNamespace(), pol.GetName(), ruleResp.Name()) } + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: pol.GetKind(), + Name: pol.GetName(), + Namespace: pol.GetNamespace(), + UID: pol.GetUID(), + } + related := engineResponse.GetResourceSpec() policyEvent := Info{ - Kind: pol.GetKind(), - Name: pol.GetName(), - Namespace: pol.GetNamespace(), - RelatedAPIVersion: engineResponse.PatchedResource.GetAPIVersion(), - RelatedKind: engineResponse.PatchedResource.GetKind(), - RelatedName: engineResponse.PatchedResource.GetName(), - RelatedNamespace: engineResponse.PatchedResource.GetNamespace(), - Reason: PolicySkipped, - Message: policyMessage, - Source: source, - Action: ResourcePassed, + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: related.APIVersion, + Kind: related.Kind, + Name: related.Name, + Namespace: related.Namespace, + UID: types.UID(related.UID), + }, + Reason: PolicySkipped, + Message: policyMessage, + Source: source, + Action: ResourcePassed, } exceptionEvent := Info{ - Kind: "PolicyException", - Name: exceptionName, - Namespace: exceptionNamespace, - RelatedAPIVersion: engineResponse.PatchedResource.GetAPIVersion(), - RelatedKind: engineResponse.PatchedResource.GetKind(), - RelatedName: engineResponse.PatchedResource.GetName(), - RelatedNamespace: engineResponse.PatchedResource.GetNamespace(), - Reason: PolicySkipped, - Message: exceptionMessage, - Source: source, - Action: ResourcePassed, + Regarding: corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v2", + Kind: "PolicyException", + Name: exceptionName, + Namespace: exceptionNamespace, + UID: exception.GetUID(), + }, + Related: &corev1.ObjectReference{ + APIVersion: related.APIVersion, + Kind: related.Kind, + Name: related.Name, + Namespace: related.Namespace, + UID: types.UID(related.UID), + }, + Reason: PolicySkipped, + Message: exceptionMessage, + Source: source, + Action: ResourcePassed, } return []Info{policyEvent, exceptionEvent} } func NewCleanupPolicyEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resource unstructured.Unstructured, err error) Info { + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v2beta1", + Kind: policy.GetKind(), + Name: policy.GetName(), + Namespace: policy.GetNamespace(), + UID: policy.GetUID(), + } + related := &corev1.ObjectReference{ + APIVersion: resource.GetAPIVersion(), + Kind: resource.GetKind(), + Namespace: resource.GetNamespace(), + Name: resource.GetName(), + } if err == nil { return Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: resource.GetAPIVersion(), - RelatedKind: resource.GetKind(), - RelatedNamespace: resource.GetNamespace(), - RelatedName: resource.GetName(), - Source: CleanupController, - Action: ResourceCleanedUp, - Reason: PolicyApplied, - Message: fmt.Sprintf("successfully cleaned up the target resource %v/%v/%v", resource.GetKind(), resource.GetNamespace(), resource.GetName()), + Regarding: regarding, + Related: related, + Source: CleanupController, + Action: ResourceCleanedUp, + Reason: PolicyApplied, + Message: fmt.Sprintf("successfully cleaned up the target resource %v/%v/%v", resource.GetKind(), resource.GetNamespace(), resource.GetName()), } } else { return Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: resource.GetAPIVersion(), - RelatedKind: resource.GetKind(), - RelatedNamespace: resource.GetNamespace(), - RelatedName: resource.GetName(), - Source: CleanupController, - Action: None, - Reason: PolicyError, - Message: fmt.Sprintf("failed to clean up the target resource %v/%v/%v: %v", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err.Error()), + Regarding: regarding, + Related: related, + Source: CleanupController, + Action: None, + Reason: PolicyError, + Message: fmt.Sprintf("failed to clean up the target resource %v/%v/%v: %v", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err.Error()), } } } func NewValidatingAdmissionPolicyEvent(policy kyvernov1.PolicyInterface, vapName, vapBindingName string) []Info { + regarding := corev1.ObjectReference{ + // TODO: iirc it's not safe to assume api version is set + APIVersion: "kyverno.io/v1", + Kind: policy.GetKind(), + Name: policy.GetName(), + Namespace: policy.GetNamespace(), + UID: policy.GetUID(), + } vapEvent := Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: "admissionregistration.k8s.io/v1alpha1", - RelatedKind: "ValidatingAdmissionPolicy", - RelatedName: vapName, - Source: GeneratePolicyController, - Action: ResourceGenerated, - Reason: PolicyApplied, - Message: fmt.Sprintf("successfully generated validating admission policy %s from policy %s", vapName, policy.GetName()), + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: "admissionregistration.k8s.io/v1alpha1", + Kind: "ValidatingAdmissionPolicy", + Name: vapName, + }, + Source: GeneratePolicyController, + Action: ResourceGenerated, + Reason: PolicyApplied, + Message: fmt.Sprintf("successfully generated validating admission policy %s from policy %s", vapName, policy.GetName()), } vapBindingEvent := Info{ - Kind: policy.GetKind(), - Namespace: policy.GetNamespace(), - Name: policy.GetName(), - RelatedAPIVersion: "admissionregistration.k8s.io/v1alpha1", - RelatedKind: "ValidatingAdmissionPolicyBinding", - RelatedName: vapBindingName, - Source: GeneratePolicyController, - Action: ResourceGenerated, - Reason: PolicyApplied, - Message: fmt.Sprintf("successfully generated validating admission policy binding %s from policy %s", vapBindingName, policy.GetName()), + Regarding: regarding, + Related: &corev1.ObjectReference{ + APIVersion: "admissionregistration.k8s.io/v1alpha1", + Kind: "ValidatingAdmissionPolicyBinding", + Name: vapBindingName, + }, + Source: GeneratePolicyController, + Action: ResourceGenerated, + Reason: PolicyApplied, + Message: fmt.Sprintf("successfully generated validating admission policy binding %s from policy %s", vapBindingName, policy.GetName()), } return []Info{vapEvent, vapBindingEvent} } func NewFailedEvent(err error, policy, rule string, source Source, resource kyvernov1.ResourceSpec) Info { return Info{ - Kind: resource.GetKind(), - Namespace: resource.GetNamespace(), - Name: resource.GetName(), - Source: source, - Reason: PolicyError, - Message: fmt.Sprintf("policy %s/%s error: %v", policy, rule, err), - Action: None, + Regarding: corev1.ObjectReference{ + APIVersion: resource.APIVersion, + Kind: resource.Kind, + Name: resource.Name, + Namespace: resource.Namespace, + UID: resource.UID, + }, + Source: source, + Reason: PolicyError, + Message: fmt.Sprintf("policy %s/%s error: %v", policy, rule, err), + Action: None, } } diff --git a/pkg/event/info.go b/pkg/event/info.go index 00b6bddc9e..d3d054414e 100644 --- a/pkg/event/info.go +++ b/pkg/event/info.go @@ -1,25 +1,24 @@ package event -import "strings" +import ( + "strings" + + corev1 "k8s.io/api/core/v1" +) // Info defines the event details type Info struct { - Kind string - Name string - Namespace string - RelatedAPIVersion string - RelatedKind string - RelatedName string - RelatedNamespace string - Reason Reason - Message string - Action Action - Source Source + Regarding corev1.ObjectReference + Related *corev1.ObjectReference + Reason Reason + Message string + Action Action + Source Source } func (i *Info) Resource() string { - if i.Namespace == "" { - return strings.Join([]string{i.Kind, i.Name}, "/") + if i.Regarding.Namespace == "" { + return strings.Join([]string{i.Regarding.Kind, i.Regarding.Name}, "/") } - return strings.Join([]string{i.Kind, i.Namespace, i.Name}, "/") + return strings.Join([]string{i.Regarding.Kind, i.Regarding.Namespace, i.Regarding.Name}, "/") } diff --git a/pkg/event/recorder.go b/pkg/event/recorder.go deleted file mode 100644 index ba9c51fbcf..0000000000 --- a/pkg/event/recorder.go +++ /dev/null @@ -1,21 +0,0 @@ -package event - -import ( - "github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - typedeventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" - "k8s.io/client-go/tools/events" -) - -func NewRecorder(source Source, sink typedeventsv1.EventsV1Interface) events.EventRecorder { - utilruntime.Must(scheme.AddToScheme(scheme.Scheme)) - eventBroadcaster := events.NewBroadcaster( - &events.EventSinkImpl{ - Interface: sink, - }, - ) - eventBroadcaster.StartStructuredLogging(0) - stopCh := make(chan struct{}) - eventBroadcaster.StartRecordingToSink(stopCh) - return eventBroadcaster.NewRecorder(scheme.Scheme, string(source)) -} diff --git a/pkg/utils/kube/unstructured.go b/pkg/utils/kube/unstructured.go index 5cc2a153ee..ccbd680a5e 100644 --- a/pkg/utils/kube/unstructured.go +++ b/pkg/utils/kube/unstructured.go @@ -28,22 +28,3 @@ func ObjToUnstructured(obj interface{}) (*unstructured.Unstructured, error) { } return &unstructured.Unstructured{Object: unstrObj}, nil } - -func NewUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured { - return &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": apiVersion, - "kind": kind, - "metadata": map[string]interface{}{ - "namespace": namespace, - "name": name, - }, - }, - } -} - -func NewUnstructuredWithSpec(apiVersion, kind, namespace, name string, spec map[string]interface{}) *unstructured.Unstructured { - u := NewUnstructured(apiVersion, kind, namespace, name) - u.Object["spec"] = spec - return u -}