From 287eb84d07b50985c895d573dce7d186fb6f7bf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Fri, 30 Sep 2022 13:24:47 +0200 Subject: [PATCH] refactor: use context in controllers instead of chan (#4761) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Charles-Edouard Brétéché --- cmd/kyverno/main.go | 8 +-- pkg/controllers/certmanager/controller.go | 52 +++++++++---------- pkg/controllers/config/controller.go | 11 ++-- pkg/controllers/controller.go | 4 +- pkg/controllers/policycache/controller.go | 8 +-- .../report/admission/controller.go | 24 ++++----- .../report/aggregate/controller.go | 48 ++++++++--------- .../report/background/controller.go | 26 +++++----- pkg/controllers/report/resource/controller.go | 14 ++--- pkg/utils/controller/run.go | 24 ++++----- pkg/utils/controller/utils.go | 14 ++--- pkg/utils/report/create.go | 14 ++--- pkg/utils/report/delete.go | 14 ++--- pkg/utils/report/update.go | 14 ++--- .../resource/validation/validation.go | 3 +- 15 files changed, 143 insertions(+), 135 deletions(-) diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 480ee06e61..c1ec62e94b 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -464,7 +464,7 @@ func main() { os.Exit(1) } webhookCfg.UpdateWebhookChan <- true - go certManager.Run(stopCh) + go certManager.Run(signalCtx) go policyCtrl.Run(2, stopCh) reportControllers := setupReportControllers( @@ -481,7 +481,7 @@ func main() { metadataInformer.WaitForCacheSync(stopCh) for _, controller := range reportControllers { - go controller.Run(stopCh) + go controller.Run(signalCtx) } } @@ -516,10 +516,10 @@ func main() { // init events handlers // start Kyverno controllers - go policyCacheController.Run(stopCh) + go policyCacheController.Run(signalCtx) go urc.Run(genWorkers, stopCh) go le.Run(signalCtx) - go configurationController.Run(stopCh) + go configurationController.Run(signalCtx) go eventGenerator.Run(3, stopCh) if !debug { go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh) diff --git a/pkg/controllers/certmanager/controller.go b/pkg/controllers/certmanager/controller.go index 9db0375c17..0ce436d7c1 100644 --- a/pkg/controllers/certmanager/controller.go +++ b/pkg/controllers/certmanager/controller.go @@ -1,12 +1,14 @@ package certmanager import ( + "context" "os" "reflect" "time" "github.com/kyverno/kyverno/pkg/common" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/controllers" "github.com/kyverno/kyverno/pkg/tls" corev1 "k8s.io/api/core/v1" corev1informers "k8s.io/client-go/informers/core/v1" @@ -15,9 +17,7 @@ import ( ) type Controller interface { - // Run starts the certManager - Run(stopCh <-chan struct{}) - + controllers.Controller // GetTLSPemPair gets the existing TLSPemPair from the secret GetTLSPemPair() ([]byte, []byte, error) } @@ -46,6 +46,29 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer *t return manager, nil } +func (m *controller) Run(ctx context.Context) { + logger.Info("start managing certificate") + certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval) + defer certsRenewalTicker.Stop() + for { + select { + case <-certsRenewalTicker.C: + if err := m.renewCertificates(); err != nil { + logger.Error(err, "unable to renew certificates, force restarting") + os.Exit(1) + } + case <-m.secretQueue: + if err := m.renewCertificates(); err != nil { + logger.Error(err, "unable to renew certificates, force restarting") + os.Exit(1) + } + case <-ctx.Done(): + logger.V(2).Info("stopping cert renewer") + return + } + } +} + func (m *controller) addSecretFunc(obj interface{}) { secret := obj.(*corev1.Secret) if secret.GetNamespace() == config.KyvernoNamespace() && secret.GetName() == tls.GenerateTLSPairSecretName() { @@ -98,26 +121,3 @@ func (m *controller) GetCAPem() ([]byte, error) { } return result, nil } - -func (m *controller) Run(stopCh <-chan struct{}) { - logger.Info("start managing certificate") - certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval) - defer certsRenewalTicker.Stop() - for { - select { - case <-certsRenewalTicker.C: - if err := m.renewCertificates(); err != nil { - logger.Error(err, "unable to renew certificates, force restarting") - os.Exit(1) - } - case <-m.secretQueue: - if err := m.renewCertificates(); err != nil { - logger.Error(err, "unable to renew certificates, force restarting") - os.Exit(1) - } - case <-stopCh: - logger.V(2).Info("stopping cert renewer") - return - } - } -} diff --git a/pkg/controllers/config/controller.go b/pkg/controllers/config/controller.go index ded3711234..e8f46c7deb 100644 --- a/pkg/controllers/config/controller.go +++ b/pkg/controllers/config/controller.go @@ -1,8 +1,11 @@ package config import ( + "context" + "github.com/go-logr/logr" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/controllers" controllerutils "github.com/kyverno/kyverno/pkg/utils/controller" "k8s.io/apimachinery/pkg/api/errors" corev1informers "k8s.io/client-go/informers/core/v1" @@ -29,7 +32,7 @@ type controller struct { queue workqueue.RateLimitingInterface } -func NewController(configuration config.Configuration, configmapInformer corev1informers.ConfigMapInformer) *controller { +func NewController(configuration config.Configuration, configmapInformer corev1informers.ConfigMapInformer) controllers.Controller { c := controller{ configuration: configuration, configmapLister: configmapInformer.Lister(), @@ -41,11 +44,11 @@ func NewController(configuration config.Configuration, configmapInformer corev1i return &c } -func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh, c.configmapSynced) +func (c *controller) Run(ctx context.Context) { + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.configmapSynced) } -func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { if namespace != config.KyvernoNamespace() || name != config.KyvernoConfigMapName() { return nil } diff --git a/pkg/controllers/controller.go b/pkg/controllers/controller.go index 64eedb03cd..f7d8d5aca9 100644 --- a/pkg/controllers/controller.go +++ b/pkg/controllers/controller.go @@ -1,6 +1,8 @@ package controllers +import "context" + type Controller interface { // Run starts the controller - Run(stopCh <-chan struct{}) + Run(context.Context) } diff --git a/pkg/controllers/policycache/controller.go b/pkg/controllers/policycache/controller.go index e0851823fc..99a5a2d6b4 100644 --- a/pkg/controllers/policycache/controller.go +++ b/pkg/controllers/policycache/controller.go @@ -1,6 +1,8 @@ package policycache import ( + "context" + "github.com/go-logr/logr" kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1" @@ -78,11 +80,11 @@ func (c *controller) WarmUp() error { return nil } -func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh, c.cpolSynced, c.polSynced) +func (c *controller) Run(ctx context.Context) { + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.cpolSynced, c.polSynced) } -func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { policy, err := c.loadPolicy(namespace, name) if err != nil { if errors.IsNotFound(err) { diff --git a/pkg/controllers/report/admission/controller.go b/pkg/controllers/report/admission/controller.go index a03ffdef24..2a5dd0d072 100644 --- a/pkg/controllers/report/admission/controller.go +++ b/pkg/controllers/report/admission/controller.go @@ -62,7 +62,7 @@ func NewController( return &c } -func (c *controller) Run(stopCh <-chan struct{}) { +func (c *controller) Run(ctx context.Context) { c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) { selector, err := reportutils.SelectorResourceUidEquals(uid) if err != nil { @@ -72,7 +72,7 @@ func (c *controller) Run(stopCh <-chan struct{}) { logger.Error(err, "failed to enqueue") } }) - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile) } func (c *controller) enqueue(selector labels.Selector) error { @@ -116,23 +116,23 @@ func (c *controller) getMeta(namespace, name string) (metav1.Object, error) { } } -func (c *controller) deleteReport(namespace, name string) error { +func (c *controller) deleteReport(ctx context.Context, namespace, name string) error { if namespace == "" { - return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(context.TODO(), name, metav1.DeleteOptions{}) + return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(ctx, name, metav1.DeleteOptions{}) } else { - return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Delete(ctx, name, metav1.DeleteOptions{}) } } -func (c *controller) getReport(namespace, name string) (kyvernov1alpha2.ReportInterface, error) { +func (c *controller) getReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) { if namespace == "" { - return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Get(context.TODO(), name, metav1.GetOptions{}) + return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Get(ctx, name, metav1.GetOptions{}) } else { - return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Get(ctx, name, metav1.GetOptions{}) } } -func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { // try to find meta from the cache meta, err := c.getMeta(namespace, name) if err != nil { @@ -146,12 +146,12 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) resource, gvk, exists := c.metadataCache.GetResourceHash(uid) // set owner if not done yet if exists && len(meta.GetOwnerReferences()) == 0 { - report, err := c.getReport(namespace, name) + report, err := c.getReport(ctx, namespace, name) if err != nil { return err } controllerutils.SetOwner(report, gvk.GroupVersion().String(), gvk.Kind, resource.Name, uid) - _, err = reportutils.UpdateReport(report, c.client) + _, err = reportutils.UpdateReport(ctx, report, c.client) return err } // cleanup old reports @@ -159,7 +159,7 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) // and were created more than five minutes ago if !exists || !reportutils.CompareHash(meta, resource.Hash) { if meta.GetCreationTimestamp().Add(time.Minute * 5).Before(time.Now()) { - return c.deleteReport(namespace, name) + return c.deleteReport(ctx, namespace, name) } } return nil diff --git a/pkg/controllers/report/aggregate/controller.go b/pkg/controllers/report/aggregate/controller.go index ea9e48ed8f..b0802de8f4 100644 --- a/pkg/controllers/report/aggregate/controller.go +++ b/pkg/controllers/report/aggregate/controller.go @@ -81,14 +81,14 @@ func NewController( return &c } -func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) +func (c *controller) Run(ctx context.Context) { + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile) } -func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { +func (c *controller) listAdmissionReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) { var reports []kyvernov1alpha2.ReportInterface if namespace == "" { - cadms, err := c.client.KyvernoV1alpha2().ClusterAdmissionReports().List(context.TODO(), metav1.ListOptions{}) + cadms, err := c.client.KyvernoV1alpha2().ClusterAdmissionReports().List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.R reports = append(reports, &cadms.Items[i]) } } else { - adms, err := c.client.KyvernoV1alpha2().AdmissionReports(namespace).List(context.TODO(), metav1.ListOptions{}) + adms, err := c.client.KyvernoV1alpha2().AdmissionReports(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -107,10 +107,10 @@ func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.R return reports, nil } -func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { +func (c *controller) listBackgroundScanReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) { var reports []kyvernov1alpha2.ReportInterface if namespace == "" { - cbgscans, err := c.client.KyvernoV1alpha2().ClusterBackgroundScanReports().List(context.TODO(), metav1.ListOptions{}) + cbgscans, err := c.client.KyvernoV1alpha2().ClusterBackgroundScanReports().List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -118,7 +118,7 @@ func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alp reports = append(reports, &cbgscans.Items[i]) } } else { - bgscans, err := c.client.KyvernoV1alpha2().BackgroundScanReports(namespace).List(context.TODO(), metav1.ListOptions{}) + bgscans, err := c.client.KyvernoV1alpha2().BackgroundScanReports(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -129,26 +129,26 @@ func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alp return reports, nil } -func (c *controller) reconcileReport(report kyvernov1alpha2.ReportInterface, namespace, name string, results ...policyreportv1alpha2.PolicyReportResult) (kyvernov1alpha2.ReportInterface, error) { +func (c *controller) reconcileReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, namespace, name string, results ...policyreportv1alpha2.PolicyReportResult) (kyvernov1alpha2.ReportInterface, error) { if report == nil { - return reportutils.CreateReport(c.client, reportutils.NewPolicyReport(namespace, name, results...)) + return reportutils.CreateReport(ctx, reportutils.NewPolicyReport(namespace, name, results...), c.client) } after := reportutils.DeepCopy(report) reportutils.SetResults(after, results...) if reflect.DeepEqual(report, after) { return after, nil } - return reportutils.UpdateReport(after, c.client) + return reportutils.UpdateReport(ctx, after, c.client) } -func (c *controller) cleanReports(actual map[string]kyvernov1alpha2.ReportInterface, expected []kyvernov1alpha2.ReportInterface) error { +func (c *controller) cleanReports(ctx context.Context, actual map[string]kyvernov1alpha2.ReportInterface, expected []kyvernov1alpha2.ReportInterface) error { keep := sets.NewString() for _, obj := range expected { keep.Insert(obj.GetName()) } for _, obj := range actual { if !keep.Has(obj.GetName()) { - err := reportutils.DeleteReport(obj, c.client) + err := reportutils.DeleteReport(ctx, obj, c.client) if err != nil { return err } @@ -181,17 +181,17 @@ func mergeReports(accumulator map[string]policyreportv1alpha2.PolicyReportResult } } -func (c *controller) buildReportsResults(namepsace string) ([]policyreportv1alpha2.PolicyReportResult, error) { +func (c *controller) buildReportsResults(ctx context.Context, namepsace string) ([]policyreportv1alpha2.PolicyReportResult, error) { merged := map[string]policyreportv1alpha2.PolicyReportResult{} { - reports, err := c.listAdmissionReports(namepsace) + reports, err := c.listAdmissionReports(ctx, namepsace) if err != nil { return nil, err } mergeReports(merged, reports...) } { - reports, err := c.listBackgroundScanReports(namepsace) + reports, err := c.listBackgroundScanReports(ctx, namepsace) if err != nil { return nil, err } @@ -204,10 +204,10 @@ func (c *controller) buildReportsResults(namepsace string) ([]policyreportv1alph return results, nil } -func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { +func (c *controller) getPolicyReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) { var reports []kyvernov1alpha2.ReportInterface if namespace == "" { - list, err := c.client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().List(context.TODO(), metav1.ListOptions{}) + list, err := c.client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -215,7 +215,7 @@ func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.Repor reports = append(reports, &list.Items[i]) } } else { - list, err := c.client.Wgpolicyk8sV1alpha2().PolicyReports(namespace).List(context.TODO(), metav1.ListOptions{}) + list, err := c.client.Wgpolicyk8sV1alpha2().PolicyReports(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } @@ -226,12 +226,12 @@ func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.Repor return reports, nil } -func (c *controller) reconcile(logger logr.Logger, key, _, _ string) error { - results, err := c.buildReportsResults(key) +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, _, _ string) error { + results, err := c.buildReportsResults(ctx, key) if err != nil { return err } - policyReports, err := c.getPolicyReports(key) + policyReports, err := c.getPolicyReports(ctx, key) if err != nil { return err } @@ -255,12 +255,12 @@ func (c *controller) reconcile(logger logr.Logger, key, _, _ string) error { if i > 0 { name = fmt.Sprintf("%s-%d", name, i/chunkSize) } - report, err := c.reconcileReport(actual[name], key, name, results[i:end]...) + report, err := c.reconcileReport(ctx, actual[name], key, name, results[i:end]...) if err != nil { return err } expected = append(expected, report) } } - return c.cleanReports(actual, expected) + return c.cleanReports(ctx, actual, expected) } diff --git a/pkg/controllers/report/background/controller.go b/pkg/controllers/report/background/controller.go index 3d032add10..157cace435 100644 --- a/pkg/controllers/report/background/controller.go +++ b/pkg/controllers/report/background/controller.go @@ -85,7 +85,7 @@ func NewController( return &c } -func (c *controller) Run(stopCh <-chan struct{}) { +func (c *controller) Run(ctx context.Context) { c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, resource resource.Resource) { selector, err := reportutils.SelectorResourceUidEquals(uid) if err != nil { @@ -100,7 +100,7 @@ func (c *controller) Run(stopCh <-chan struct{}) { c.queue.Add(resource.Namespace + "/" + string(uid)) } }) - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile) } func (c *controller) addPolicy(obj interface{}) { @@ -184,7 +184,7 @@ func (c *controller) fetchPolicies(logger logr.Logger, namespace string) ([]kyve return policies, nil } -func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKind, resource resource.Resource) error { +func (c *controller) updateReport(ctx context.Context, meta metav1.Object, gvk schema.GroupVersionKind, resource resource.Resource) error { namespace := meta.GetNamespace() labels := meta.GetLabels() // load all policies @@ -207,7 +207,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin // if the resource changed, we need to rebuild the report if !reportutils.CompareHash(meta, resource.Hash) { scanner := utils.NewScanner(logger, c.client) - before, err := c.getReport(meta.GetNamespace(), meta.GetName()) + before, err := c.getReport(ctx, meta.GetNamespace(), meta.GetName()) if err != nil { return nil } @@ -240,7 +240,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin if reflect.DeepEqual(before, report) { return nil } - _, err = reportutils.UpdateReport(report, c.kyvernoClient) + _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient) return err } else { expected := map[string]kyvernov1.PolicyInterface{} @@ -275,7 +275,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin if len(toDelete) == 0 && len(toCreate) == 0 { return nil } - before, err := c.getReport(meta.GetNamespace(), meta.GetName()) + before, err := c.getReport(ctx, meta.GetNamespace(), meta.GetName()) if err != nil { return err } @@ -319,16 +319,16 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin if reflect.DeepEqual(before, report) { return nil } - _, err = reportutils.UpdateReport(report, c.kyvernoClient) + _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient) return err } } -func (c *controller) getReport(namespace, name string) (kyvernov1alpha2.ReportInterface, error) { +func (c *controller) getReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) { if namespace == "" { - return c.kyvernoClient.KyvernoV1alpha2().ClusterBackgroundScanReports().Get(context.TODO(), name, metav1.GetOptions{}) + return c.kyvernoClient.KyvernoV1alpha2().ClusterBackgroundScanReports().Get(ctx, name, metav1.GetOptions{}) } else { - return c.kyvernoClient.KyvernoV1alpha2().BackgroundScanReports(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + return c.kyvernoClient.KyvernoV1alpha2().BackgroundScanReports(namespace).Get(ctx, name, metav1.GetOptions{}) } } @@ -348,7 +348,7 @@ func (c *controller) getMeta(namespace, name string) (metav1.Object, error) { } } -func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { // try to find resource from the cache uid := types.UID(name) resource, gvk, exists := c.metadataCache.GetResourceHash(uid) @@ -360,10 +360,10 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) if err != nil { if apierrors.IsNotFound(err) { // if there's no report yet, try to create an empty one - _, err = reportutils.CreateReport(c.kyvernoClient, reportutils.NewBackgroundScanReport(namespace, name, gvk, resource.Name, uid)) + _, err = reportutils.CreateReport(ctx, reportutils.NewBackgroundScanReport(namespace, name, gvk, resource.Name, uid), c.kyvernoClient) return err } return err } - return c.updateReport(report, gvk, resource) + return c.updateReport(ctx, report, gvk, resource) } diff --git a/pkg/controllers/report/resource/controller.go b/pkg/controllers/report/resource/controller.go index 9de1dbae1d..ebb3da41cd 100644 --- a/pkg/controllers/report/resource/controller.go +++ b/pkg/controllers/report/resource/controller.go @@ -84,8 +84,8 @@ func NewController( return &c } -func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) +func (c *controller) Run(ctx context.Context) { + controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile) } func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) { @@ -110,7 +110,7 @@ func (c *controller) AddEventHandler(eventHandler EventHandler) { } } -func (c *controller) updateDynamicWatchers() error { +func (c *controller) updateDynamicWatchers(ctx context.Context) error { c.lock.Lock() defer c.lock.Unlock() clusterPolicies, err := c.fetchClusterPolicies(logger) @@ -139,7 +139,7 @@ func (c *controller) updateDynamicWatchers() error { delete(c.dynamicWatchers, gvr) } else { logger.Info("start watcher ...", "gvr", gvr) - watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.TODO(), metav1.ListOptions{}) + watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, metav1.ListOptions{}) if err != nil { logger.Error(err, "failed to create watcher", "gvr", gvr) } else { @@ -162,7 +162,7 @@ func (c *controller) updateDynamicWatchers() error { } } }() - objs, err := c.client.GetDynamicInterface().Resource(gvr).List(context.TODO(), metav1.ListOptions{}) + objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{}) if err != nil { logger.Error(err, "failed to list resources", "gvr", gvr) watchInterface.Stop() @@ -251,6 +251,6 @@ func (c *controller) fetchPolicies(logger logr.Logger, namespace string) ([]kyve return policies, nil } -func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { - return c.updateDynamicWatchers() +func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error { + return c.updateDynamicWatchers(ctx) } diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index 3ba231a418..5da9376605 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -13,18 +13,18 @@ import ( "k8s.io/client-go/util/workqueue" ) -type reconcileFunc func(logger logr.Logger, key string, namespace string, name string) error +type reconcileFunc func(ctx context.Context, logger logr.Logger, key string, namespace string, name string) error -func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) { +func Run(ctx context.Context, controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, cacheSyncs ...cache.InformerSynced) { logger.Info("starting ...") defer runtime.HandleCrash() defer logger.Info("stopped") var wg sync.WaitGroup func() { - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) defer cancel() defer queue.ShutDown() - if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) { return } for i := 0; i < n; i++ { @@ -33,24 +33,24 @@ func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimiting logger.Info("starting worker") defer wg.Done() defer logger.Info("worker stopped") - wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, ctx.Done()) + wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, queue, maxRetries, r) }, time.Second) }(logger.WithValues("id", i)) } - <-stopCh + <-ctx.Done() }() logger.Info("waiting for workers to terminate ...") wg.Wait() } -func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) { - for processNextWorkItem(logger, queue, maxRetries, r) { +func worker(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) { + for processNextWorkItem(ctx, logger, queue, maxRetries, r) { } } -func processNextWorkItem(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { +func processNextWorkItem(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { if obj, quit := queue.Get(); !quit { defer queue.Done(obj) - handleErr(logger, queue, maxRetries, reconcile(logger, obj, r), obj) + handleErr(logger, queue, maxRetries, reconcile(ctx, logger, obj, r), obj) return true } return false @@ -71,7 +71,7 @@ func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRet } } -func reconcile(logger logr.Logger, obj interface{}, r reconcileFunc) error { +func reconcile(ctx context.Context, logger logr.Logger, obj interface{}, r reconcileFunc) error { start := time.Now() var k, ns, n string if key, ok := obj.(cache.ExplicitKey); ok { @@ -87,5 +87,5 @@ func reconcile(logger logr.Logger, obj interface{}, r reconcileFunc) error { logger = logger.WithValues("key", k, "namespace", ns, "name", n) logger.Info("reconciling ...") defer logger.Info("done", time.Since(start)) - return r(logger, k, ns, n) + return r(ctx, logger, k, ns, n) } diff --git a/pkg/utils/controller/utils.go b/pkg/utils/controller/utils.go index 53629c83d3..4c3663da7f 100644 --- a/pkg/utils/controller/utils.go +++ b/pkg/utils/controller/utils.go @@ -105,7 +105,7 @@ func GetOrNew[T any, R Object[T], G Getter[R]](name string, getter G) (R, error) return obj, nil } -func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](name string, getter G, setter S, build func(R) error) (R, error) { +func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](ctx context.Context, name string, getter G, setter S, build func(R) error) (R, error) { if obj, err := GetOrNew[T, R](name, getter); err != nil { return nil, err } else { @@ -114,19 +114,19 @@ func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](name string, g return nil, err } else { if obj.GetResourceVersion() == "" { - return setter.Create(context.TODO(), mutated, metav1.CreateOptions{}) + return setter.Create(ctx, mutated, metav1.CreateOptions{}) } else { if reflect.DeepEqual(obj, mutated) { return mutated, nil } else { - return setter.Update(context.TODO(), mutated, metav1.UpdateOptions{}) + return setter.Update(ctx, mutated, metav1.UpdateOptions{}) } } } } } -func Update[T any, R Object[T], S Setter[R]](setter S, obj R, build func(R) error) (R, error) { +func Update[T any, R Object[T], S Setter[R]](ctx context.Context, setter S, obj R, build func(R) error) (R, error) { mutated := obj.DeepCopy() if err := build(mutated); err != nil { return nil, err @@ -134,19 +134,19 @@ func Update[T any, R Object[T], S Setter[R]](setter S, obj R, build func(R) erro if reflect.DeepEqual(obj, mutated) { return mutated, nil } else { - return setter.Update(context.TODO(), mutated, metav1.UpdateOptions{}) + return setter.Update(ctx, mutated, metav1.UpdateOptions{}) } } } -func Cleanup[T any, R Object[T]](actual []R, expected []R, deleter Deleter) error { +func Cleanup[T any, R Object[T]](ctx context.Context, actual []R, expected []R, deleter Deleter) error { keep := sets.NewString() for _, obj := range expected { keep.Insert(obj.GetName()) } for _, obj := range actual { if !keep.Has(obj.GetName()) { - if err := deleter.Delete(context.TODO(), obj.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + if err := deleter.Delete(ctx, obj.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return err } } diff --git a/pkg/utils/report/create.go b/pkg/utils/report/create.go index f81b6766ea..9af43e1328 100644 --- a/pkg/utils/report/create.go +++ b/pkg/utils/report/create.go @@ -10,25 +10,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func CreateReport(client versioned.Interface, report kyvernov1alpha2.ReportInterface) (kyvernov1alpha2.ReportInterface, error) { +func CreateReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) { switch v := report.(type) { case *kyvernov1alpha2.AdmissionReport: - report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{}) return report, err case *kyvernov1alpha2.ClusterAdmissionReport: - report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Create(ctx, v, metav1.CreateOptions{}) return report, err case *kyvernov1alpha2.BackgroundScanReport: - report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{}) return report, err case *kyvernov1alpha2.ClusterBackgroundScanReport: - report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Create(ctx, v, metav1.CreateOptions{}) return report, err case *policyreportv1alpha2.PolicyReport: - report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{}) return report, err case *policyreportv1alpha2.ClusterPolicyReport: - report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Create(context.TODO(), v, metav1.CreateOptions{}) + report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Create(ctx, v, metav1.CreateOptions{}) return report, err default: return nil, errors.New("unknow type") diff --git a/pkg/utils/report/delete.go b/pkg/utils/report/delete.go index 5d7ceea8cd..4ea76bd04e 100644 --- a/pkg/utils/report/delete.go +++ b/pkg/utils/report/delete.go @@ -10,20 +10,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func DeleteReport(report kyvernov1alpha2.ReportInterface, client versioned.Interface) error { +func DeleteReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) error { switch v := report.(type) { case *kyvernov1alpha2.AdmissionReport: - return client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{}) case *kyvernov1alpha2.ClusterAdmissionReport: - return client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{}) case *kyvernov1alpha2.BackgroundScanReport: - return client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{}) case *kyvernov1alpha2.ClusterBackgroundScanReport: - return client.KyvernoV1alpha2().ClusterBackgroundScanReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.KyvernoV1alpha2().ClusterBackgroundScanReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{}) case *policyreportv1alpha2.PolicyReport: - return client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{}) case *policyreportv1alpha2.ClusterPolicyReport: - return client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) + return client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{}) default: return errors.New("unknow type") } diff --git a/pkg/utils/report/update.go b/pkg/utils/report/update.go index f08a4e7d61..16af02a530 100644 --- a/pkg/utils/report/update.go +++ b/pkg/utils/report/update.go @@ -10,25 +10,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func UpdateReport(report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) { +func UpdateReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) { switch v := report.(type) { case *kyvernov1alpha2.AdmissionReport: - report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{}) return report, err case *kyvernov1alpha2.ClusterAdmissionReport: - report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Update(ctx, v, metav1.UpdateOptions{}) return report, err case *kyvernov1alpha2.BackgroundScanReport: - report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{}) return report, err case *kyvernov1alpha2.ClusterBackgroundScanReport: - report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Update(ctx, v, metav1.UpdateOptions{}) return report, err case *policyreportv1alpha2.PolicyReport: - report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{}) return report, err case *policyreportv1alpha2.ClusterPolicyReport: - report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), v, metav1.UpdateOptions{}) + report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(ctx, v, metav1.UpdateOptions{}) return report, err default: return nil, errors.New("unknow type") diff --git a/pkg/webhooks/resource/validation/validation.go b/pkg/webhooks/resource/validation/validation.go index f84bec346a..7e311d351d 100644 --- a/pkg/webhooks/resource/validation/validation.go +++ b/pkg/webhooks/resource/validation/validation.go @@ -1,6 +1,7 @@ package validation import ( + "context" "reflect" "time" @@ -177,7 +178,7 @@ func (v *validationHandler) handleAudit( gv := metav1.GroupVersion{Group: request.Kind.Group, Version: request.Kind.Version} controllerutils.SetOwner(report, gv.String(), request.Kind.Kind, resource.GetName(), resource.GetUID()) } - _, err = reportutils.CreateReport(v.kyvernoClient, report) + _, err = reportutils.CreateReport(context.Background(), report, v.kyvernoClient) if err != nil { v.log.Error(err, "failed to create report") }