From 56aae9f5057b1540bd83fb0563100d9c0d3d94d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Thu, 24 Nov 2022 14:21:08 +0100 Subject: [PATCH] fix: admission reports stacking up (#5457) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: admission reports stacking up Signed-off-by: Charles-Edouard Brétéché * utils Signed-off-by: Charles-Edouard Brétéché * warmup Signed-off-by: Charles-Edouard Brétéché * cleanup Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix logger Signed-off-by: Charles-Edouard Brétéché * nits Signed-off-by: Charles-Edouard Brétéché Signed-off-by: Charles-Edouard Brétéché --- .../v1alpha2/admission_report_types.go | 2 + charts/kyverno/templates/crds.yaml | 8 + cmd/kyverno/main.go | 45 +++- config/crds/kyverno.io_admissionreports.yaml | 4 + .../kyverno.io_clusteradmissionreports.yaml | 4 + config/install.yaml | 8 + config/install_debug.yaml | 8 + .../report/admission/controller.go | 247 ++++++++++++------ .../report/aggregate/controller.go | 33 ++- .../report/background/controller.go | 36 +-- pkg/controllers/report/resource/controller.go | 30 ++- pkg/controllers/report/utils/utils.go | 33 +++ pkg/utils/controller/metadata.go | 9 + pkg/utils/controller/run.go | 4 +- pkg/utils/report/labels.go | 2 + pkg/utils/report/new.go | 41 ++- .../resource/imageverification/handler.go | 2 +- .../resource/validation/validation.go | 2 +- 18 files changed, 339 insertions(+), 179 deletions(-) diff --git a/api/kyverno/v1alpha2/admission_report_types.go b/api/kyverno/v1alpha2/admission_report_types.go index e6bd3e0db6..a99c88494f 100644 --- a/api/kyverno/v1alpha2/admission_report_types.go +++ b/api/kyverno/v1alpha2/admission_report_types.go @@ -49,6 +49,7 @@ type AdmissionReportSpec struct { // +kubebuilder:printcolumn:name="Skip",type=integer,JSONPath=".spec.summary.skip" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="Hash",type=string,JSONPath=".metadata.labels['audit\\.kyverno\\.io/resource\\.hash']",priority=1 +// +kubebuilder:printcolumn:name="AGGREGATE",type=string,JSONPath=".metadata.labels['audit\\.kyverno\\.io/report\\.aggregate']",priority=1 // AdmissionReport is the Schema for the AdmissionReports API type AdmissionReport struct { @@ -85,6 +86,7 @@ func (r *AdmissionReport) SetSummary(summary policyreportv1alpha2.PolicyReportSu // +kubebuilder:printcolumn:name="Skip",type=integer,JSONPath=".spec.summary.skip" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="Hash",type=string,JSONPath=".metadata.labels['audit\\.kyverno\\.io/resource\\.hash']",priority=1 +// +kubebuilder:printcolumn:name="AGGREGATE",type=string,JSONPath=".metadata.labels['audit\\.kyverno\\.io/report\\.aggregate']",priority=1 // ClusterAdmissionReport is the Schema for the ClusterAdmissionReports API type ClusterAdmissionReport struct { diff --git a/charts/kyverno/templates/crds.yaml b/charts/kyverno/templates/crds.yaml index ddcd1834ea..96b2553759 100644 --- a/charts/kyverno/templates/crds.yaml +++ b/charts/kyverno/templates/crds.yaml @@ -64,6 +64,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: @@ -1505,6 +1509,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 89bdbc534a..e8cc7ed4b6 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -263,8 +263,9 @@ func createReportControllers( metadataFactory metadatainformers.SharedInformerFactory, kubeInformer kubeinformers.SharedInformerFactory, kyvernoInformer kyvernoinformer.SharedInformerFactory, -) []controller { +) ([]controller, func(context.Context) error) { var ctrls []controller + var warmups []func(context.Context) error kyvernoV1 := kyvernoInformer.Kyverno().V1() if backgroundScan || admissionReports { resourceReportController := resourcereportcontroller.NewController( @@ -272,6 +273,9 @@ func createReportControllers( kyvernoV1.Policies(), kyvernoV1.ClusterPolicies(), ) + warmups = append(warmups, func(ctx context.Context) error { + return resourceReportController.Warmup(ctx) + }) ctrls = append(ctrls, newController( resourcereportcontroller.ControllerName, resourceReportController, @@ -316,7 +320,14 @@ func createReportControllers( )) } } - return ctrls + return ctrls, func(ctx context.Context) error { + for _, warmup := range warmups { + if err := warmup(ctx); err != nil { + return err + } + } + return nil + } } func createrLeaderControllers( @@ -332,7 +343,7 @@ func createrLeaderControllers( eventGenerator event.Interface, certRenewer tls.CertRenewer, runtime runtimeutils.Runtime, -) ([]controller, error) { +) ([]controller, func(context.Context) error, error) { policyCtrl, err := policy.NewPolicyController( kyvernoClient, dynamicClient, @@ -347,7 +358,7 @@ func createrLeaderControllers( metricsConfig, ) if err != nil { - return nil, err + return nil, nil, err } certManager := certmanager.NewController( kubeKyvernoInformer.Core().V1().Secrets(), @@ -373,22 +384,24 @@ func createrLeaderControllers( admissionReports, runtime, ) + reportControllers, warmup := createReportControllers( + backgroundScan, + admissionReports, + dynamicClient, + kyvernoClient, + metadataInformer, + kubeInformer, + kyvernoInformer, + ) return append( []controller{ newController("policy-controller", policyCtrl, 2), newController(certmanager.ControllerName, certManager, certmanager.Workers), newController(webhookcontroller.ControllerName, webhookController, webhookcontroller.Workers), }, - createReportControllers( - backgroundScan, - admissionReports, - dynamicClient, - kyvernoClient, - metadataInformer, - kubeInformer, - kyvernoInformer, - )..., + reportControllers..., ), + warmup, nil } @@ -541,7 +554,7 @@ func main() { kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod) metadataInformer := metadatainformers.NewSharedInformerFactory(metadataClient, 15*time.Minute) // create leader controllers - leaderControllers, err := createrLeaderControllers( + leaderControllers, warmup, err := createrLeaderControllers( kubeInformer, kubeKyvernoInformer, kyvernoInformer, @@ -569,6 +582,10 @@ func main() { // TODO: shall we just exit ? logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") } + if err := warmup(ctx); err != nil { + logger.Error(err, "failed to run warmup") + os.Exit(1) + } // start leader controllers var wg sync.WaitGroup for _, controller := range leaderControllers { diff --git a/config/crds/kyverno.io_admissionreports.yaml b/config/crds/kyverno.io_admissionreports.yaml index 6f52841b62..3baafd3e52 100644 --- a/config/crds/kyverno.io_admissionreports.yaml +++ b/config/crds/kyverno.io_admissionreports.yaml @@ -55,6 +55,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: diff --git a/config/crds/kyverno.io_clusteradmissionreports.yaml b/config/crds/kyverno.io_clusteradmissionreports.yaml index 2d9eddf5ca..80400daf39 100644 --- a/config/crds/kyverno.io_clusteradmissionreports.yaml +++ b/config/crds/kyverno.io_clusteradmissionreports.yaml @@ -55,6 +55,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: diff --git a/config/install.yaml b/config/install.yaml index 4dd42fe537..c7c9b8106b 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -72,6 +72,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: @@ -2160,6 +2164,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: diff --git a/config/install_debug.yaml b/config/install_debug.yaml index bb0923c599..889f902668 100644 --- a/config/install_debug.yaml +++ b/config/install_debug.yaml @@ -70,6 +70,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: @@ -2155,6 +2159,10 @@ spec: name: Hash priority: 1 type: string + - jsonPath: .metadata.labels['audit\.kyverno\.io/report\.aggregate'] + name: AGGREGATE + priority: 1 + type: string name: v1alpha2 schema: openAPIV3Schema: diff --git a/pkg/controllers/report/admission/controller.go b/pkg/controllers/report/admission/controller.go index 00e1b7862c..6e3d0a2834 100644 --- a/pkg/controllers/report/admission/controller.go +++ b/pkg/controllers/report/admission/controller.go @@ -6,14 +6,17 @@ import ( "github.com/go-logr/logr" kyvernov1alpha2 "github.com/kyverno/kyverno/api/kyverno/v1alpha2" + policyreportv1alpha2 "github.com/kyverno/kyverno/api/policyreport/v1alpha2" "github.com/kyverno/kyverno/pkg/client/clientset/versioned" "github.com/kyverno/kyverno/pkg/controllers" "github.com/kyverno/kyverno/pkg/controllers/report/resource" + "github.com/kyverno/kyverno/pkg/controllers/report/utils" controllerutils "github.com/kyverno/kyverno/pkg/utils/controller" reportutils "github.com/kyverno/kyverno/pkg/utils/report" + "go.uber.org/multierr" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" metadatainformers "k8s.io/client-go/metadata/metadatainformer" @@ -37,9 +40,7 @@ type controller struct { cadmrLister cache.GenericLister // queue - queue workqueue.RateLimitingInterface - admrEnqueue controllerutils.EnqueueFunc - cadmrEnqueue controllerutils.EnqueueFunc + queue workqueue.RateLimitingInterface // cache metadataCache resource.MetadataCache @@ -58,19 +59,21 @@ func NewController( admrLister: admrInformer.Lister(), cadmrLister: cadmrInformer.Lister(), queue: queue, - admrEnqueue: controllerutils.AddDefaultEventHandlers(logger, admrInformer.Informer(), queue), - cadmrEnqueue: controllerutils.AddDefaultEventHandlers(logger, cadmrInformer.Informer(), queue), metadataCache: metadataCache, } - c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) { - selector, err := reportutils.SelectorResourceUidEquals(uid) - if err != nil { - logger.Error(err, "failed to create label selector") - } - if err := c.enqueue(selector); err != nil { - logger.Error(err, "failed to enqueue") - } - }) + c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) { queue.Add(cache.ExplicitKey(uid)) }) + controllerutils.AddEventHandlersT( + admrInformer.Informer(), + func(obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(obj))) }, + func(old, obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(old))) }, + func(obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(obj))) }, + ) + controllerutils.AddEventHandlersT( + cadmrInformer.Informer(), + func(obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(obj))) }, + func(old, obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(old))) }, + func(obj metav1.Object) { queue.Add(cache.ExplicitKey(reportutils.GetResourceUid(obj))) }, + ) return &c } @@ -78,46 +81,6 @@ func (c *controller) Run(ctx context.Context, workers int) { controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile) } -func (c *controller) enqueue(selector labels.Selector) error { - admrs, err := c.admrLister.List(selector) - if err != nil { - return err - } - for _, adm := range admrs { - err = c.admrEnqueue(adm) - if err != nil { - logger.Error(err, "failed to enqueue") - } - } - cadmrs, err := c.cadmrLister.List(selector) - if err != nil { - return err - } - for _, cadmr := range cadmrs { - err = c.admrEnqueue(cadmr) - if err != nil { - logger.Error(err, "failed to enqueue") - } - } - return nil -} - -func (c *controller) getMeta(namespace, name string) (metav1.Object, error) { - if namespace == "" { - obj, err := c.cadmrLister.Get(name) - if err != nil { - return nil, err - } - return obj.(metav1.Object), err - } else { - obj, err := c.admrLister.ByNamespace(namespace).Get(name) - if err != nil { - return nil, err - } - return obj.(metav1.Object), err - } -} - func (c *controller) deleteReport(ctx context.Context, namespace, name string) error { if namespace == "" { return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(ctx, name, metav1.DeleteOptions{}) @@ -126,7 +89,7 @@ func (c *controller) deleteReport(ctx context.Context, namespace, name string) e } } -func (c *controller) getReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) { +func (c *controller) fetchReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) { if namespace == "" { return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Get(ctx, name, metav1.GetOptions{}) } else { @@ -134,44 +97,154 @@ func (c *controller) getReport(ctx context.Context, namespace, name string) (kyv } } -func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { - // try to find meta from the cache - meta, err := c.getMeta(namespace, name) +func (c *controller) getReports(uid types.UID) ([]metav1.Object, error) { + selector, err := reportutils.SelectorResourceUidEquals(uid) if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err + return nil, err } - // try to find resource from the cache - uid := reportutils.GetResourceUid(meta) - resource, gvk, found := c.metadataCache.GetResourceHash(uid) - // set owner if not done yet - if found && len(meta.GetOwnerReferences()) == 0 { - report, err := c.getReport(ctx, namespace, name) - if err != nil { + var results []metav1.Object + admrs, err := c.admrLister.List(selector) + if err != nil { + return nil, err + } + for _, admr := range admrs { + results = append(results, admr.(metav1.Object)) + } + cadmrs, err := c.cadmrLister.List(selector) + if err != nil { + return nil, err + } + for _, cadmr := range cadmrs { + results = append(results, cadmr.(metav1.Object)) + } + return results, nil +} + +func mergeReports(accumulator map[string]policyreportv1alpha2.PolicyReportResult, reports ...kyvernov1alpha2.ReportInterface) { + for _, report := range reports { + if len(report.GetOwnerReferences()) == 1 { + ownerRef := report.GetOwnerReferences()[0] + objectRefs := []corev1.ObjectReference{{ + APIVersion: ownerRef.APIVersion, + Kind: ownerRef.Kind, + Namespace: report.GetNamespace(), + Name: ownerRef.Name, + UID: ownerRef.UID, + }} + for _, result := range report.GetResults() { + key := result.Policy + "/" + result.Rule + "/" + string(ownerRef.UID) + result.Resources = objectRefs + if rule, exists := accumulator[key]; !exists { + accumulator[key] = result + } else if rule.Timestamp.Seconds < result.Timestamp.Seconds { + accumulator[key] = result + } + } + } + } +} + +func (c *controller) aggregateReports(ctx context.Context, uid types.UID, gvk schema.GroupVersionKind, res resource.Resource, reports ...metav1.Object) error { + before, err := c.fetchReport(ctx, res.Namespace, string(uid)) + if err != nil { + if !apierrors.IsNotFound(err) { return err } - controllerutils.SetOwner(report, gvk.GroupVersion().String(), gvk.Kind, resource.Name, uid) - _, err = reportutils.UpdateReport(ctx, report, c.client) - return err + before = reportutils.NewAdmissionReport(res.Namespace, string(uid), res.Name, uid, metav1.GroupVersionKind(gvk)) } - // cleanup old reports - // if they are not the same version as the current resource version - // and were created more than 2 minutes ago - if !found { - // if we didn't find the resource, either no policy exist for this kind - // or the resource was never created, we delete the report if it has no owner - // and was created more than 2 minutes ago - if len(meta.GetOwnerReferences()) == 0 && meta.GetCreationTimestamp().Add(time.Minute*2).Before(time.Now()) { - return c.deleteReport(ctx, namespace, name) + merged := map[string]policyreportv1alpha2.PolicyReportResult{} + for _, report := range reports { + if reportutils.GetResourceHash(report) == res.Hash { + if report.GetName() == string(uid) { + mergeReports(merged, before) + } else { + // TODO: see if we can use List instead of fetching reports one by one + report, err := c.fetchReport(ctx, report.GetNamespace(), report.GetName()) + if err != nil { + return err + } + mergeReports(merged, report) + } + } + } + var results []policyreportv1alpha2.PolicyReportResult + for _, result := range merged { + results = append(results, result) + } + after := before + if before.GetResourceVersion() != "" { + after = reportutils.DeepCopy(before) + } + controllerutils.SetOwner(after, gvk.GroupVersion().String(), gvk.Kind, res.Name, uid) + controllerutils.SetLabel(after, reportutils.LabelResourceHash, res.Hash) + controllerutils.SetLabel(after, reportutils.LabelAggregatedReport, res.Hash) + reportutils.SetResults(after, results...) + if after.GetResourceVersion() == "" { + if len(results) > 0 { + if _, err := reportutils.CreateReport(ctx, after, c.client); err != nil { + return err + } } } else { - // if hashes don't match and the report was created more than 2 - // minutes ago we consider it obsolete and delete the report - if !reportutils.CompareHash(meta, resource.Hash) && meta.GetCreationTimestamp().Add(time.Minute*2).Before(time.Now()) { - return c.deleteReport(ctx, namespace, name) + if len(results) == 0 { + if err := c.deleteReport(ctx, after.GetNamespace(), after.GetName()); err != nil { + return err + } + } else { + if !utils.ReportsAreIdentical(before, after) { + if _, err = reportutils.UpdateReport(ctx, after, c.client); err != nil { + return err + } + } } } - return nil + return c.cleanupReports(ctx, uid, res.Hash, reports...) +} + +func (c *controller) cleanupReports(ctx context.Context, uid types.UID, hash string, reports ...metav1.Object) error { + var toDelete []metav1.Object + for _, report := range reports { + if report.GetName() != string(uid) { + if reportutils.GetResourceHash(report) == hash || report.GetCreationTimestamp().Add(time.Minute*2).Before(time.Now()) { + toDelete = append(toDelete, report) + } else { + c.queue.AddAfter(cache.ExplicitKey(uid), time.Minute*2) + } + } + } + var errs []error + for _, report := range toDelete { + if err := c.deleteReport(ctx, report.GetNamespace(), report.GetName()); err != nil { + errs = append(errs, err) + } + } + return multierr.Combine(errs...) +} + +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, _, _ string) error { + uid := types.UID(key) + // find related reports + reports, err := c.getReports(uid) + if err != nil { + return err + } + // is the resource known + resource, gvk, found := c.metadataCache.GetResourceHash(uid) + if !found { + return c.cleanupReports(ctx, "", "", reports...) + } + // set orphan reports an owner + for _, report := range reports { + if len(report.GetOwnerReferences()) == 0 { + report, err := c.fetchReport(ctx, report.GetNamespace(), report.GetName()) + if err != nil { + return err + } + controllerutils.SetOwner(report, gvk.GroupVersion().String(), gvk.Kind, resource.Name, uid) + _, err = reportutils.UpdateReport(ctx, report, c.client) + return err + } + } + // build an aggregated report + return c.aggregateReports(ctx, uid, gvk, resource, reports...) } diff --git a/pkg/controllers/report/aggregate/controller.go b/pkg/controllers/report/aggregate/controller.go index 2591708ad4..15ddf5909c 100644 --- a/pkg/controllers/report/aggregate/controller.go +++ b/pkg/controllers/report/aggregate/controller.go @@ -28,7 +28,6 @@ import ( ) // TODO: resync in resource controller -// TODO: error handling in resource controller // TODO: policy hash const ( @@ -98,10 +97,26 @@ func NewController( delay := 15 * time.Second controllerutils.AddDelayedExplicitEventHandlers(logger, polrInformer.Informer(), c.queue, delay, keyFunc) controllerutils.AddDelayedExplicitEventHandlers(logger, cpolrInformer.Informer(), c.queue, delay, keyFunc) - controllerutils.AddDelayedExplicitEventHandlers(logger, admrInformer.Informer(), c.queue, delay, keyFunc) - controllerutils.AddDelayedExplicitEventHandlers(logger, cadmrInformer.Informer(), c.queue, delay, keyFunc) controllerutils.AddDelayedExplicitEventHandlers(logger, bgscanrInformer.Informer(), c.queue, delay, keyFunc) controllerutils.AddDelayedExplicitEventHandlers(logger, cbgscanrInformer.Informer(), c.queue, delay, keyFunc) + enqueueFromAdmr := func(obj metav1.Object) { + // no need to consider non aggregated reports + if controllerutils.HasLabel(obj, reportutils.LabelAggregatedReport) { + c.queue.AddAfter(keyFunc(obj), delay) + } + } + controllerutils.AddEventHandlersT( + admrInformer.Informer(), + func(obj metav1.Object) { enqueueFromAdmr(obj) }, + func(_, obj metav1.Object) { enqueueFromAdmr(obj) }, + func(obj metav1.Object) { enqueueFromAdmr(obj) }, + ) + controllerutils.AddEventHandlersT( + cadmrInformer.Informer(), + func(obj metav1.Object) { enqueueFromAdmr(obj) }, + func(_, obj metav1.Object) { enqueueFromAdmr(obj) }, + func(obj metav1.Object) { enqueueFromAdmr(obj) }, + ) return &c } @@ -114,8 +129,10 @@ func (c *controller) mergeAdmissionReports(ctx context.Context, namespace string next := "" for { cadms, err := c.client.KyvernoV1alpha2().ClusterAdmissionReports().List(ctx, metav1.ListOptions{ - Limit: mergeLimit, - Continue: next, + // no need to consider non aggregated reports + LabelSelector: reportutils.LabelAggregatedReport, + Limit: mergeLimit, + Continue: next, }) if err != nil { return err @@ -132,8 +149,10 @@ func (c *controller) mergeAdmissionReports(ctx context.Context, namespace string next := "" for { adms, err := c.client.KyvernoV1alpha2().AdmissionReports(namespace).List(ctx, metav1.ListOptions{ - Limit: mergeLimit, - Continue: next, + // no need to consider non aggregated reports + LabelSelector: reportutils.LabelAggregatedReport, + Limit: mergeLimit, + Continue: next, }) if err != nil { return err diff --git a/pkg/controllers/report/background/controller.go b/pkg/controllers/report/background/controller.go index 8a44e04214..9d7fc1ef8a 100644 --- a/pkg/controllers/report/background/controller.go +++ b/pkg/controllers/report/background/controller.go @@ -2,7 +2,6 @@ package background import ( "context" - "reflect" "time" "github.com/go-logr/logr" @@ -24,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" corev1informers "k8s.io/client-go/informers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" metadatainformers "k8s.io/client-go/metadata/metadatainformer" @@ -190,36 +188,6 @@ func (c *controller) fetchPolicies(logger logr.Logger, namespace string) ([]kyve return policies, nil } -// reportsAreIdentical we expect reports are sorted before comparing them -func reportsAreIdentical(before, after kyvernov1alpha2.ReportInterface) bool { - bLabels := sets.NewString() - aLabels := sets.NewString() - for key := range before.GetLabels() { - bLabels.Insert(key) - } - for key := range after.GetLabels() { - aLabels.Insert(key) - } - if !aLabels.Equal(bLabels) { - return false - } - b := before.GetResults() - a := after.GetResults() - if len(a) != len(b) { - return false - } - for i := range a { - a := a[i] - b := b[i] - a.Timestamp = metav1.Timestamp{} - b.Timestamp = metav1.Timestamp{} - if !reflect.DeepEqual(&a, &b) { - return false - } - } - return true -} - func (c *controller) updateReport(ctx context.Context, meta metav1.Object, gvk schema.GroupVersionKind, resource resource.Resource) error { namespace := meta.GetNamespace() labels := meta.GetLabels() @@ -273,7 +241,7 @@ func (c *controller) updateReport(ctx context.Context, meta metav1.Object, gvk s } } reportutils.SetResponses(report, responses...) - if reportsAreIdentical(before, report) { + if utils.ReportsAreIdentical(before, report) { return nil } _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient) @@ -352,7 +320,7 @@ func (c *controller) updateReport(ctx context.Context, meta metav1.Object, gvk s } } reportutils.SetResults(report, ruleResults...) - if reportsAreIdentical(before, report) { + if utils.ReportsAreIdentical(before, report) { return nil } _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient) diff --git a/pkg/controllers/report/resource/controller.go b/pkg/controllers/report/resource/controller.go index 4266e28303..30ab7d64a7 100644 --- a/pkg/controllers/report/resource/controller.go +++ b/pkg/controllers/report/resource/controller.go @@ -2,7 +2,6 @@ package resource import ( "context" - "errors" "sync" "time" @@ -46,6 +45,7 @@ type EventHandler func(types.UID, schema.GroupVersionKind, Resource) type MetadataCache interface { GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) AddEventHandler(EventHandler) + Warmup(ctx context.Context) error } type Controller interface { @@ -92,8 +92,13 @@ func NewController( return &c } +func (c *controller) Warmup(ctx context.Context) error { + return c.updateDynamicWatchers(ctx) +} + func (c *controller) Run(ctx context.Context, workers int) { controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile) + c.stopDynamicWatchers() } func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) { @@ -151,6 +156,7 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { } dynamicWatchers := map[schema.GroupVersionResource]*watcher{} for gvk, gvr := range gvrs { + logger := logger.WithValues("gvr", gvr, "gvk", gvk) // if we already have one, transfer it to the new map if c.dynamicWatchers[gvr] != nil { dynamicWatchers[gvr] = c.dynamicWatchers[gvr] @@ -159,7 +165,7 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { hashes := map[types.UID]Resource{} objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) if err != nil { - logger.Error(err, "failed to list resources", "gvr", gvr) + logger.Error(err, "failed to list resources") } else { resourceVersion := objs.GetResourceVersion() for _, obj := range objs.Items { @@ -172,17 +178,18 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { } c.notify(uid, gvk, hashes[uid]) } - logger.Info("start watcher ...", "gvr", gvr, "resourceVersion", resourceVersion) + logger := logger.WithValues("resourceVersion", resourceVersion) + logger.Info("start watcher ...") watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { - watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, options) - if err != nil && !errors.Is(err, context.Canceled) { - logger.Error(err, "failed to watch", "gvr", gvr) + watch, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options) + if err != nil { + logger.Error(err, "failed to watch") } return watch, err } watchInterface, err := watchTools.NewRetryWatcher(resourceVersion, &cache.ListWatch{WatchFunc: watchFunc}) if err != nil { - logger.Error(err, "failed to create watcher", "gvr", gvr) + logger.Error(err, "failed to create watcher") } else { w := &watcher{ watcher: watchInterface, @@ -221,6 +228,15 @@ func (c *controller) updateDynamicWatchers(ctx context.Context) error { return nil } +func (c *controller) stopDynamicWatchers() { + c.lock.Lock() + defer c.lock.Unlock() + for _, watcher := range c.dynamicWatchers { + watcher.watcher.Stop() + } + c.dynamicWatchers = map[schema.GroupVersionResource]*watcher{} +} + func (c *controller) notify(uid types.UID, gvk schema.GroupVersionKind, obj Resource) { for _, handler := range c.eventHandlers { handler(uid, gvk, obj) diff --git a/pkg/controllers/report/utils/utils.go b/pkg/controllers/report/utils/utils.go index dba3144fcc..0fa7cb7af6 100644 --- a/pkg/controllers/report/utils/utils.go +++ b/pkg/controllers/report/utils/utils.go @@ -1,10 +1,14 @@ package utils import ( + "reflect" + "github.com/go-logr/logr" kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" + kyvernov1alpha2 "github.com/kyverno/kyverno/api/kyverno/v1alpha2" "github.com/kyverno/kyverno/pkg/autogen" "github.com/kyverno/kyverno/pkg/policy" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" ) @@ -50,3 +54,32 @@ func RemoveNonValidationPolicies(logger logr.Logger, policies ...kyvernov1.Polic } return validationPolicies } + +func ReportsAreIdentical(before, after kyvernov1alpha2.ReportInterface) bool { + bLabels := sets.NewString() + aLabels := sets.NewString() + for key := range before.GetLabels() { + bLabels.Insert(key) + } + for key := range after.GetLabels() { + aLabels.Insert(key) + } + if !aLabels.Equal(bLabels) { + return false + } + b := before.GetResults() + a := after.GetResults() + if len(a) != len(b) { + return false + } + for i := range a { + a := a[i] + b := b[i] + a.Timestamp = metav1.Timestamp{} + b.Timestamp = metav1.Timestamp{} + if !reflect.DeepEqual(&a, &b) { + return false + } + } + return true +} diff --git a/pkg/utils/controller/metadata.go b/pkg/utils/controller/metadata.go index e5105e74f0..0a8cf70d42 100644 --- a/pkg/utils/controller/metadata.go +++ b/pkg/utils/controller/metadata.go @@ -23,6 +23,15 @@ func CheckLabel(obj metav1.Object, key, value string) bool { return labels[key] == value } +func HasLabel(obj metav1.Object, key string) bool { + labels := obj.GetLabels() + if labels == nil { + return false + } + _, exists := labels[key] + return exists +} + func SetAnnotation(obj metav1.Object, key, value string) { annotations := obj.GetAnnotations() if annotations == nil { diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index b6c2196834..50227b1514 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -91,7 +91,7 @@ func reconcile(ctx context.Context, logger logr.Logger, obj interface{}, r recon } } logger = logger.WithValues("key", k, "namespace", ns, "name", n) - logger.V(6).Info("reconciling ...") - defer logger.V(6).Info("done", "duration", time.Since(start).String()) + logger.Info("reconciling ...") + defer logger.Info("done", "duration", time.Since(start).String()) return r(ctx, logger, k, ns, n) } diff --git a/pkg/utils/report/labels.go b/pkg/utils/report/labels.go index 49808229d4..d322bf525f 100644 --- a/pkg/utils/report/labels.go +++ b/pkg/utils/report/labels.go @@ -24,6 +24,8 @@ const ( LabelDomainPolicy = "pol.kyverno.io" LabelPrefixClusterPolicy = LabelDomainClusterPolicy + "/" LabelPrefixPolicy = LabelDomainPolicy + "/" + // aggregated admission report label + LabelAggregatedReport = "audit.kyverno.io/report.aggregate" ) func IsPolicyLabel(label string) bool { diff --git a/pkg/utils/report/new.go b/pkg/utils/report/new.go index da7e6b904a..63a87dc5fb 100644 --- a/pkg/utils/report/new.go +++ b/pkg/utils/report/new.go @@ -12,41 +12,30 @@ import ( "k8s.io/apimachinery/pkg/types" ) -func NewAdmissionReport(resource unstructured.Unstructured, request *admissionv1.AdmissionRequest, gvk metav1.GroupVersionKind, responses ...*response.EngineResponse) kyvernov1alpha2.ReportInterface { - name := string(request.UID) - namespace := resource.GetNamespace() - owner := resource.GetName() - uid := resource.GetUID() +func NewAdmissionReport(namespace, name, owner string, uid types.UID, gvk metav1.GroupVersionKind) kyvernov1alpha2.ReportInterface { + ownerRef := metav1.OwnerReference{ + APIVersion: metav1.GroupVersion{Group: gvk.Group, Version: gvk.Version}.String(), + Kind: gvk.Kind, + Name: owner, + UID: uid, + } var report kyvernov1alpha2.ReportInterface if namespace == "" { - report = &kyvernov1alpha2.ClusterAdmissionReport{ - Spec: kyvernov1alpha2.AdmissionReportSpec{ - Owner: metav1.OwnerReference{ - APIVersion: metav1.GroupVersion{Group: gvk.Group, Version: gvk.Version}.String(), - Kind: gvk.Kind, - Name: owner, - UID: uid, - }, - }, - } + report = &kyvernov1alpha2.ClusterAdmissionReport{Spec: kyvernov1alpha2.AdmissionReportSpec{Owner: ownerRef}} } else { - report = &kyvernov1alpha2.AdmissionReport{ - Spec: kyvernov1alpha2.AdmissionReportSpec{ - Owner: metav1.OwnerReference{ - APIVersion: metav1.GroupVersion{Group: gvk.Group, Version: gvk.Version}.String(), - Kind: gvk.Kind, - Name: owner, - UID: uid, - }, - }, - } + report = &kyvernov1alpha2.AdmissionReport{Spec: kyvernov1alpha2.AdmissionReportSpec{Owner: ownerRef}} } report.SetName(name) report.SetNamespace(namespace) SetResourceLabels(report, uid) + SetManagedByKyvernoLabel(report) + return report +} + +func BuildAdmissionReport(resource unstructured.Unstructured, request *admissionv1.AdmissionRequest, gvk metav1.GroupVersionKind, responses ...*response.EngineResponse) kyvernov1alpha2.ReportInterface { + report := NewAdmissionReport(resource.GetNamespace(), string(request.UID), resource.GetName(), resource.GetUID(), gvk) SetResourceVersionLabels(report, &resource) SetResponses(report, responses...) - SetManagedByKyvernoLabel(report) return report } diff --git a/pkg/webhooks/resource/imageverification/handler.go b/pkg/webhooks/resource/imageverification/handler.go index f0e7034d74..dff4bbed52 100644 --- a/pkg/webhooks/resource/imageverification/handler.go +++ b/pkg/webhooks/resource/imageverification/handler.go @@ -147,7 +147,7 @@ func (v *imageVerificationHandler) handleAudit( if !reportutils.IsGvkSupported(schema.GroupVersionKind(request.Kind)) { return } - report := reportutils.NewAdmissionReport(resource, request, request.Kind, engineResponses...) + report := reportutils.BuildAdmissionReport(resource, request, request.Kind, engineResponses...) // if it's not a creation, the resource already exists, we can set the owner if request.Operation != admissionv1.Create { gv := metav1.GroupVersion{Group: request.Kind.Group, Version: request.Kind.Version} diff --git a/pkg/webhooks/resource/validation/validation.go b/pkg/webhooks/resource/validation/validation.go index 594f95ef65..eacb82ee36 100644 --- a/pkg/webhooks/resource/validation/validation.go +++ b/pkg/webhooks/resource/validation/validation.go @@ -172,7 +172,7 @@ func (v *validationHandler) handleAudit( v.log.Error(err, "failed to build audit responses") } responses = append(responses, engineResponses...) - report := reportutils.NewAdmissionReport(resource, request, request.Kind, responses...) + report := reportutils.BuildAdmissionReport(resource, request, request.Kind, responses...) // if it's not a creation, the resource already exists, we can set the owner if request.Operation != admissionv1.Create { gv := metav1.GroupVersion{Group: request.Kind.Group, Version: request.Kind.Version}