1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-05 15:37:19 +00:00

refactor: use context in controllers instead of chan (#4761)

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-09-30 13:24:47 +02:00 committed by GitHub
parent c42851a37a
commit 287eb84d07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 143 additions and 135 deletions

View file

@ -464,7 +464,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
webhookCfg.UpdateWebhookChan <- true webhookCfg.UpdateWebhookChan <- true
go certManager.Run(stopCh) go certManager.Run(signalCtx)
go policyCtrl.Run(2, stopCh) go policyCtrl.Run(2, stopCh)
reportControllers := setupReportControllers( reportControllers := setupReportControllers(
@ -481,7 +481,7 @@ func main() {
metadataInformer.WaitForCacheSync(stopCh) metadataInformer.WaitForCacheSync(stopCh)
for _, controller := range reportControllers { for _, controller := range reportControllers {
go controller.Run(stopCh) go controller.Run(signalCtx)
} }
} }
@ -516,10 +516,10 @@ func main() {
// init events handlers // init events handlers
// start Kyverno controllers // start Kyverno controllers
go policyCacheController.Run(stopCh) go policyCacheController.Run(signalCtx)
go urc.Run(genWorkers, stopCh) go urc.Run(genWorkers, stopCh)
go le.Run(signalCtx) go le.Run(signalCtx)
go configurationController.Run(stopCh) go configurationController.Run(signalCtx)
go eventGenerator.Run(3, stopCh) go eventGenerator.Run(3, stopCh)
if !debug { if !debug {
go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh) go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh)

View file

@ -1,12 +1,14 @@
package certmanager package certmanager
import ( import (
"context"
"os" "os"
"reflect" "reflect"
"time" "time"
"github.com/kyverno/kyverno/pkg/common" "github.com/kyverno/kyverno/pkg/common"
"github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers"
"github.com/kyverno/kyverno/pkg/tls" "github.com/kyverno/kyverno/pkg/tls"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
@ -15,9 +17,7 @@ import (
) )
type Controller interface { type Controller interface {
// Run starts the certManager controllers.Controller
Run(stopCh <-chan struct{})
// GetTLSPemPair gets the existing TLSPemPair from the secret // GetTLSPemPair gets the existing TLSPemPair from the secret
GetTLSPemPair() ([]byte, []byte, error) GetTLSPemPair() ([]byte, []byte, error)
} }
@ -46,6 +46,29 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer *t
return manager, nil return manager, nil
} }
func (m *controller) Run(ctx context.Context) {
logger.Info("start managing certificate")
certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval)
defer certsRenewalTicker.Stop()
for {
select {
case <-certsRenewalTicker.C:
if err := m.renewCertificates(); err != nil {
logger.Error(err, "unable to renew certificates, force restarting")
os.Exit(1)
}
case <-m.secretQueue:
if err := m.renewCertificates(); err != nil {
logger.Error(err, "unable to renew certificates, force restarting")
os.Exit(1)
}
case <-ctx.Done():
logger.V(2).Info("stopping cert renewer")
return
}
}
}
func (m *controller) addSecretFunc(obj interface{}) { func (m *controller) addSecretFunc(obj interface{}) {
secret := obj.(*corev1.Secret) secret := obj.(*corev1.Secret)
if secret.GetNamespace() == config.KyvernoNamespace() && secret.GetName() == tls.GenerateTLSPairSecretName() { if secret.GetNamespace() == config.KyvernoNamespace() && secret.GetName() == tls.GenerateTLSPairSecretName() {
@ -98,26 +121,3 @@ func (m *controller) GetCAPem() ([]byte, error) {
} }
return result, nil return result, nil
} }
func (m *controller) Run(stopCh <-chan struct{}) {
logger.Info("start managing certificate")
certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval)
defer certsRenewalTicker.Stop()
for {
select {
case <-certsRenewalTicker.C:
if err := m.renewCertificates(); err != nil {
logger.Error(err, "unable to renew certificates, force restarting")
os.Exit(1)
}
case <-m.secretQueue:
if err := m.renewCertificates(); err != nil {
logger.Error(err, "unable to renew certificates, force restarting")
os.Exit(1)
}
case <-stopCh:
logger.V(2).Info("stopping cert renewer")
return
}
}
}

View file

@ -1,8 +1,11 @@
package config package config
import ( import (
"context"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller" controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
@ -29,7 +32,7 @@ type controller struct {
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
func NewController(configuration config.Configuration, configmapInformer corev1informers.ConfigMapInformer) *controller { func NewController(configuration config.Configuration, configmapInformer corev1informers.ConfigMapInformer) controllers.Controller {
c := controller{ c := controller{
configuration: configuration, configuration: configuration,
configmapLister: configmapInformer.Lister(), configmapLister: configmapInformer.Lister(),
@ -41,11 +44,11 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
return &c return &c
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh, c.configmapSynced) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.configmapSynced)
} }
func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
if namespace != config.KyvernoNamespace() || name != config.KyvernoConfigMapName() { if namespace != config.KyvernoNamespace() || name != config.KyvernoConfigMapName() {
return nil return nil
} }

View file

@ -1,6 +1,8 @@
package controllers package controllers
import "context"
type Controller interface { type Controller interface {
// Run starts the controller // Run starts the controller
Run(stopCh <-chan struct{}) Run(context.Context)
} }

View file

@ -1,6 +1,8 @@
package policycache package policycache
import ( import (
"context"
"github.com/go-logr/logr" "github.com/go-logr/logr"
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
@ -78,11 +80,11 @@ func (c *controller) WarmUp() error {
return nil return nil
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh, c.cpolSynced, c.polSynced) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, c.cpolSynced, c.polSynced)
} }
func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
policy, err := c.loadPolicy(namespace, name) policy, err := c.loadPolicy(namespace, name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {

View file

@ -62,7 +62,7 @@ func NewController(
return &c return &c
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) { c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) {
selector, err := reportutils.SelectorResourceUidEquals(uid) selector, err := reportutils.SelectorResourceUidEquals(uid)
if err != nil { if err != nil {
@ -72,7 +72,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
logger.Error(err, "failed to enqueue") logger.Error(err, "failed to enqueue")
} }
}) })
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
} }
func (c *controller) enqueue(selector labels.Selector) error { func (c *controller) enqueue(selector labels.Selector) error {
@ -116,23 +116,23 @@ func (c *controller) getMeta(namespace, name string) (metav1.Object, error) {
} }
} }
func (c *controller) deleteReport(namespace, name string) error { func (c *controller) deleteReport(ctx context.Context, namespace, name string) error {
if namespace == "" { if namespace == "" {
return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(context.TODO(), name, metav1.DeleteOptions{}) return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(ctx, name, metav1.DeleteOptions{})
} else { } else {
return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Delete(ctx, name, metav1.DeleteOptions{})
} }
} }
func (c *controller) getReport(namespace, name string) (kyvernov1alpha2.ReportInterface, error) { func (c *controller) getReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) {
if namespace == "" { if namespace == "" {
return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Get(context.TODO(), name, metav1.GetOptions{}) return c.client.KyvernoV1alpha2().ClusterAdmissionReports().Get(ctx, name, metav1.GetOptions{})
} else { } else {
return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Get(context.TODO(), name, metav1.GetOptions{}) return c.client.KyvernoV1alpha2().AdmissionReports(namespace).Get(ctx, name, metav1.GetOptions{})
} }
} }
func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
// try to find meta from the cache // try to find meta from the cache
meta, err := c.getMeta(namespace, name) meta, err := c.getMeta(namespace, name)
if err != nil { if err != nil {
@ -146,12 +146,12 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string)
resource, gvk, exists := c.metadataCache.GetResourceHash(uid) resource, gvk, exists := c.metadataCache.GetResourceHash(uid)
// set owner if not done yet // set owner if not done yet
if exists && len(meta.GetOwnerReferences()) == 0 { if exists && len(meta.GetOwnerReferences()) == 0 {
report, err := c.getReport(namespace, name) report, err := c.getReport(ctx, namespace, name)
if err != nil { if err != nil {
return err return err
} }
controllerutils.SetOwner(report, gvk.GroupVersion().String(), gvk.Kind, resource.Name, uid) controllerutils.SetOwner(report, gvk.GroupVersion().String(), gvk.Kind, resource.Name, uid)
_, err = reportutils.UpdateReport(report, c.client) _, err = reportutils.UpdateReport(ctx, report, c.client)
return err return err
} }
// cleanup old reports // cleanup old reports
@ -159,7 +159,7 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string)
// and were created more than five minutes ago // and were created more than five minutes ago
if !exists || !reportutils.CompareHash(meta, resource.Hash) { if !exists || !reportutils.CompareHash(meta, resource.Hash) {
if meta.GetCreationTimestamp().Add(time.Minute * 5).Before(time.Now()) { if meta.GetCreationTimestamp().Add(time.Minute * 5).Before(time.Now()) {
return c.deleteReport(namespace, name) return c.deleteReport(ctx, namespace, name)
} }
} }
return nil return nil

View file

@ -81,14 +81,14 @@ func NewController(
return &c return &c
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
} }
func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { func (c *controller) listAdmissionReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) {
var reports []kyvernov1alpha2.ReportInterface var reports []kyvernov1alpha2.ReportInterface
if namespace == "" { if namespace == "" {
cadms, err := c.client.KyvernoV1alpha2().ClusterAdmissionReports().List(context.TODO(), metav1.ListOptions{}) cadms, err := c.client.KyvernoV1alpha2().ClusterAdmissionReports().List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -96,7 +96,7 @@ func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.R
reports = append(reports, &cadms.Items[i]) reports = append(reports, &cadms.Items[i])
} }
} else { } else {
adms, err := c.client.KyvernoV1alpha2().AdmissionReports(namespace).List(context.TODO(), metav1.ListOptions{}) adms, err := c.client.KyvernoV1alpha2().AdmissionReports(namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -107,10 +107,10 @@ func (c *controller) listAdmissionReports(namespace string) ([]kyvernov1alpha2.R
return reports, nil return reports, nil
} }
func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { func (c *controller) listBackgroundScanReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) {
var reports []kyvernov1alpha2.ReportInterface var reports []kyvernov1alpha2.ReportInterface
if namespace == "" { if namespace == "" {
cbgscans, err := c.client.KyvernoV1alpha2().ClusterBackgroundScanReports().List(context.TODO(), metav1.ListOptions{}) cbgscans, err := c.client.KyvernoV1alpha2().ClusterBackgroundScanReports().List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,7 +118,7 @@ func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alp
reports = append(reports, &cbgscans.Items[i]) reports = append(reports, &cbgscans.Items[i])
} }
} else { } else {
bgscans, err := c.client.KyvernoV1alpha2().BackgroundScanReports(namespace).List(context.TODO(), metav1.ListOptions{}) bgscans, err := c.client.KyvernoV1alpha2().BackgroundScanReports(namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -129,26 +129,26 @@ func (c *controller) listBackgroundScanReports(namespace string) ([]kyvernov1alp
return reports, nil return reports, nil
} }
func (c *controller) reconcileReport(report kyvernov1alpha2.ReportInterface, namespace, name string, results ...policyreportv1alpha2.PolicyReportResult) (kyvernov1alpha2.ReportInterface, error) { func (c *controller) reconcileReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, namespace, name string, results ...policyreportv1alpha2.PolicyReportResult) (kyvernov1alpha2.ReportInterface, error) {
if report == nil { if report == nil {
return reportutils.CreateReport(c.client, reportutils.NewPolicyReport(namespace, name, results...)) return reportutils.CreateReport(ctx, reportutils.NewPolicyReport(namespace, name, results...), c.client)
} }
after := reportutils.DeepCopy(report) after := reportutils.DeepCopy(report)
reportutils.SetResults(after, results...) reportutils.SetResults(after, results...)
if reflect.DeepEqual(report, after) { if reflect.DeepEqual(report, after) {
return after, nil return after, nil
} }
return reportutils.UpdateReport(after, c.client) return reportutils.UpdateReport(ctx, after, c.client)
} }
func (c *controller) cleanReports(actual map[string]kyvernov1alpha2.ReportInterface, expected []kyvernov1alpha2.ReportInterface) error { func (c *controller) cleanReports(ctx context.Context, actual map[string]kyvernov1alpha2.ReportInterface, expected []kyvernov1alpha2.ReportInterface) error {
keep := sets.NewString() keep := sets.NewString()
for _, obj := range expected { for _, obj := range expected {
keep.Insert(obj.GetName()) keep.Insert(obj.GetName())
} }
for _, obj := range actual { for _, obj := range actual {
if !keep.Has(obj.GetName()) { if !keep.Has(obj.GetName()) {
err := reportutils.DeleteReport(obj, c.client) err := reportutils.DeleteReport(ctx, obj, c.client)
if err != nil { if err != nil {
return err return err
} }
@ -181,17 +181,17 @@ func mergeReports(accumulator map[string]policyreportv1alpha2.PolicyReportResult
} }
} }
func (c *controller) buildReportsResults(namepsace string) ([]policyreportv1alpha2.PolicyReportResult, error) { func (c *controller) buildReportsResults(ctx context.Context, namepsace string) ([]policyreportv1alpha2.PolicyReportResult, error) {
merged := map[string]policyreportv1alpha2.PolicyReportResult{} merged := map[string]policyreportv1alpha2.PolicyReportResult{}
{ {
reports, err := c.listAdmissionReports(namepsace) reports, err := c.listAdmissionReports(ctx, namepsace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mergeReports(merged, reports...) mergeReports(merged, reports...)
} }
{ {
reports, err := c.listBackgroundScanReports(namepsace) reports, err := c.listBackgroundScanReports(ctx, namepsace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -204,10 +204,10 @@ func (c *controller) buildReportsResults(namepsace string) ([]policyreportv1alph
return results, nil return results, nil
} }
func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.ReportInterface, error) { func (c *controller) getPolicyReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) {
var reports []kyvernov1alpha2.ReportInterface var reports []kyvernov1alpha2.ReportInterface
if namespace == "" { if namespace == "" {
list, err := c.client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().List(context.TODO(), metav1.ListOptions{}) list, err := c.client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -215,7 +215,7 @@ func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.Repor
reports = append(reports, &list.Items[i]) reports = append(reports, &list.Items[i])
} }
} else { } else {
list, err := c.client.Wgpolicyk8sV1alpha2().PolicyReports(namespace).List(context.TODO(), metav1.ListOptions{}) list, err := c.client.Wgpolicyk8sV1alpha2().PolicyReports(namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -226,12 +226,12 @@ func (c *controller) getPolicyReports(namespace string) ([]kyvernov1alpha2.Repor
return reports, nil return reports, nil
} }
func (c *controller) reconcile(logger logr.Logger, key, _, _ string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, _, _ string) error {
results, err := c.buildReportsResults(key) results, err := c.buildReportsResults(ctx, key)
if err != nil { if err != nil {
return err return err
} }
policyReports, err := c.getPolicyReports(key) policyReports, err := c.getPolicyReports(ctx, key)
if err != nil { if err != nil {
return err return err
} }
@ -255,12 +255,12 @@ func (c *controller) reconcile(logger logr.Logger, key, _, _ string) error {
if i > 0 { if i > 0 {
name = fmt.Sprintf("%s-%d", name, i/chunkSize) name = fmt.Sprintf("%s-%d", name, i/chunkSize)
} }
report, err := c.reconcileReport(actual[name], key, name, results[i:end]...) report, err := c.reconcileReport(ctx, actual[name], key, name, results[i:end]...)
if err != nil { if err != nil {
return err return err
} }
expected = append(expected, report) expected = append(expected, report)
} }
} }
return c.cleanReports(actual, expected) return c.cleanReports(ctx, actual, expected)
} }

View file

@ -85,7 +85,7 @@ func NewController(
return &c return &c
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, resource resource.Resource) { c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, resource resource.Resource) {
selector, err := reportutils.SelectorResourceUidEquals(uid) selector, err := reportutils.SelectorResourceUidEquals(uid)
if err != nil { if err != nil {
@ -100,7 +100,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.queue.Add(resource.Namespace + "/" + string(uid)) c.queue.Add(resource.Namespace + "/" + string(uid))
} }
}) })
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
} }
func (c *controller) addPolicy(obj interface{}) { func (c *controller) addPolicy(obj interface{}) {
@ -184,7 +184,7 @@ func (c *controller) fetchPolicies(logger logr.Logger, namespace string) ([]kyve
return policies, nil return policies, nil
} }
func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKind, resource resource.Resource) error { func (c *controller) updateReport(ctx context.Context, meta metav1.Object, gvk schema.GroupVersionKind, resource resource.Resource) error {
namespace := meta.GetNamespace() namespace := meta.GetNamespace()
labels := meta.GetLabels() labels := meta.GetLabels()
// load all policies // load all policies
@ -207,7 +207,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin
// if the resource changed, we need to rebuild the report // if the resource changed, we need to rebuild the report
if !reportutils.CompareHash(meta, resource.Hash) { if !reportutils.CompareHash(meta, resource.Hash) {
scanner := utils.NewScanner(logger, c.client) scanner := utils.NewScanner(logger, c.client)
before, err := c.getReport(meta.GetNamespace(), meta.GetName()) before, err := c.getReport(ctx, meta.GetNamespace(), meta.GetName())
if err != nil { if err != nil {
return nil return nil
} }
@ -240,7 +240,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin
if reflect.DeepEqual(before, report) { if reflect.DeepEqual(before, report) {
return nil return nil
} }
_, err = reportutils.UpdateReport(report, c.kyvernoClient) _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient)
return err return err
} else { } else {
expected := map[string]kyvernov1.PolicyInterface{} expected := map[string]kyvernov1.PolicyInterface{}
@ -275,7 +275,7 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin
if len(toDelete) == 0 && len(toCreate) == 0 { if len(toDelete) == 0 && len(toCreate) == 0 {
return nil return nil
} }
before, err := c.getReport(meta.GetNamespace(), meta.GetName()) before, err := c.getReport(ctx, meta.GetNamespace(), meta.GetName())
if err != nil { if err != nil {
return err return err
} }
@ -319,16 +319,16 @@ func (c *controller) updateReport(meta metav1.Object, gvk schema.GroupVersionKin
if reflect.DeepEqual(before, report) { if reflect.DeepEqual(before, report) {
return nil return nil
} }
_, err = reportutils.UpdateReport(report, c.kyvernoClient) _, err = reportutils.UpdateReport(ctx, report, c.kyvernoClient)
return err return err
} }
} }
func (c *controller) getReport(namespace, name string) (kyvernov1alpha2.ReportInterface, error) { func (c *controller) getReport(ctx context.Context, namespace, name string) (kyvernov1alpha2.ReportInterface, error) {
if namespace == "" { if namespace == "" {
return c.kyvernoClient.KyvernoV1alpha2().ClusterBackgroundScanReports().Get(context.TODO(), name, metav1.GetOptions{}) return c.kyvernoClient.KyvernoV1alpha2().ClusterBackgroundScanReports().Get(ctx, name, metav1.GetOptions{})
} else { } else {
return c.kyvernoClient.KyvernoV1alpha2().BackgroundScanReports(namespace).Get(context.TODO(), name, metav1.GetOptions{}) return c.kyvernoClient.KyvernoV1alpha2().BackgroundScanReports(namespace).Get(ctx, name, metav1.GetOptions{})
} }
} }
@ -348,7 +348,7 @@ func (c *controller) getMeta(namespace, name string) (metav1.Object, error) {
} }
} }
func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
// try to find resource from the cache // try to find resource from the cache
uid := types.UID(name) uid := types.UID(name)
resource, gvk, exists := c.metadataCache.GetResourceHash(uid) resource, gvk, exists := c.metadataCache.GetResourceHash(uid)
@ -360,10 +360,10 @@ func (c *controller) reconcile(logger logr.Logger, key, namespace, name string)
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// if there's no report yet, try to create an empty one // if there's no report yet, try to create an empty one
_, err = reportutils.CreateReport(c.kyvernoClient, reportutils.NewBackgroundScanReport(namespace, name, gvk, resource.Name, uid)) _, err = reportutils.CreateReport(ctx, reportutils.NewBackgroundScanReport(namespace, name, gvk, resource.Name, uid), c.kyvernoClient)
return err return err
} }
return err return err
} }
return c.updateReport(report, gvk, resource) return c.updateReport(ctx, report, gvk, resource)
} }

View file

@ -84,8 +84,8 @@ func NewController(
return &c return &c
} }
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
controllerutils.Run(controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile, stopCh) controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
} }
func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) { func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) {
@ -110,7 +110,7 @@ func (c *controller) AddEventHandler(eventHandler EventHandler) {
} }
} }
func (c *controller) updateDynamicWatchers() error { func (c *controller) updateDynamicWatchers(ctx context.Context) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
clusterPolicies, err := c.fetchClusterPolicies(logger) clusterPolicies, err := c.fetchClusterPolicies(logger)
@ -139,7 +139,7 @@ func (c *controller) updateDynamicWatchers() error {
delete(c.dynamicWatchers, gvr) delete(c.dynamicWatchers, gvr)
} else { } else {
logger.Info("start watcher ...", "gvr", gvr) logger.Info("start watcher ...", "gvr", gvr)
watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(context.TODO(), metav1.ListOptions{}) watchInterface, err := c.client.GetDynamicInterface().Resource(gvr).Watch(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
logger.Error(err, "failed to create watcher", "gvr", gvr) logger.Error(err, "failed to create watcher", "gvr", gvr)
} else { } else {
@ -162,7 +162,7 @@ func (c *controller) updateDynamicWatchers() error {
} }
} }
}() }()
objs, err := c.client.GetDynamicInterface().Resource(gvr).List(context.TODO(), metav1.ListOptions{}) objs, err := c.client.GetDynamicInterface().Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
logger.Error(err, "failed to list resources", "gvr", gvr) logger.Error(err, "failed to list resources", "gvr", gvr)
watchInterface.Stop() watchInterface.Stop()
@ -251,6 +251,6 @@ func (c *controller) fetchPolicies(logger logr.Logger, namespace string) ([]kyve
return policies, nil return policies, nil
} }
func (c *controller) reconcile(logger logr.Logger, key, namespace, name string) error { func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
return c.updateDynamicWatchers() return c.updateDynamicWatchers(ctx)
} }

View file

@ -13,18 +13,18 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
type reconcileFunc func(logger logr.Logger, key string, namespace string, name string) error type reconcileFunc func(ctx context.Context, logger logr.Logger, key string, namespace string, name string) error
func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) { func Run(ctx context.Context, controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, cacheSyncs ...cache.InformerSynced) {
logger.Info("starting ...") logger.Info("starting ...")
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer logger.Info("stopped") defer logger.Info("stopped")
var wg sync.WaitGroup var wg sync.WaitGroup
func() { func() {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
defer queue.ShutDown() defer queue.ShutDown()
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) {
return return
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -33,24 +33,24 @@ func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimiting
logger.Info("starting worker") logger.Info("starting worker")
defer wg.Done() defer wg.Done()
defer logger.Info("worker stopped") defer logger.Info("worker stopped")
wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, ctx.Done()) wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, queue, maxRetries, r) }, time.Second)
}(logger.WithValues("id", i)) }(logger.WithValues("id", i))
} }
<-stopCh <-ctx.Done()
}() }()
logger.Info("waiting for workers to terminate ...") logger.Info("waiting for workers to terminate ...")
wg.Wait() wg.Wait()
} }
func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) { func worker(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) {
for processNextWorkItem(logger, queue, maxRetries, r) { for processNextWorkItem(ctx, logger, queue, maxRetries, r) {
} }
} }
func processNextWorkItem(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { func processNextWorkItem(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool {
if obj, quit := queue.Get(); !quit { if obj, quit := queue.Get(); !quit {
defer queue.Done(obj) defer queue.Done(obj)
handleErr(logger, queue, maxRetries, reconcile(logger, obj, r), obj) handleErr(logger, queue, maxRetries, reconcile(ctx, logger, obj, r), obj)
return true return true
} }
return false return false
@ -71,7 +71,7 @@ func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRet
} }
} }
func reconcile(logger logr.Logger, obj interface{}, r reconcileFunc) error { func reconcile(ctx context.Context, logger logr.Logger, obj interface{}, r reconcileFunc) error {
start := time.Now() start := time.Now()
var k, ns, n string var k, ns, n string
if key, ok := obj.(cache.ExplicitKey); ok { if key, ok := obj.(cache.ExplicitKey); ok {
@ -87,5 +87,5 @@ func reconcile(logger logr.Logger, obj interface{}, r reconcileFunc) error {
logger = logger.WithValues("key", k, "namespace", ns, "name", n) logger = logger.WithValues("key", k, "namespace", ns, "name", n)
logger.Info("reconciling ...") logger.Info("reconciling ...")
defer logger.Info("done", time.Since(start)) defer logger.Info("done", time.Since(start))
return r(logger, k, ns, n) return r(ctx, logger, k, ns, n)
} }

View file

@ -105,7 +105,7 @@ func GetOrNew[T any, R Object[T], G Getter[R]](name string, getter G) (R, error)
return obj, nil return obj, nil
} }
func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](name string, getter G, setter S, build func(R) error) (R, error) { func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](ctx context.Context, name string, getter G, setter S, build func(R) error) (R, error) {
if obj, err := GetOrNew[T, R](name, getter); err != nil { if obj, err := GetOrNew[T, R](name, getter); err != nil {
return nil, err return nil, err
} else { } else {
@ -114,19 +114,19 @@ func CreateOrUpdate[T any, R Object[T], G Getter[R], S Setter[R]](name string, g
return nil, err return nil, err
} else { } else {
if obj.GetResourceVersion() == "" { if obj.GetResourceVersion() == "" {
return setter.Create(context.TODO(), mutated, metav1.CreateOptions{}) return setter.Create(ctx, mutated, metav1.CreateOptions{})
} else { } else {
if reflect.DeepEqual(obj, mutated) { if reflect.DeepEqual(obj, mutated) {
return mutated, nil return mutated, nil
} else { } else {
return setter.Update(context.TODO(), mutated, metav1.UpdateOptions{}) return setter.Update(ctx, mutated, metav1.UpdateOptions{})
} }
} }
} }
} }
} }
func Update[T any, R Object[T], S Setter[R]](setter S, obj R, build func(R) error) (R, error) { func Update[T any, R Object[T], S Setter[R]](ctx context.Context, setter S, obj R, build func(R) error) (R, error) {
mutated := obj.DeepCopy() mutated := obj.DeepCopy()
if err := build(mutated); err != nil { if err := build(mutated); err != nil {
return nil, err return nil, err
@ -134,19 +134,19 @@ func Update[T any, R Object[T], S Setter[R]](setter S, obj R, build func(R) erro
if reflect.DeepEqual(obj, mutated) { if reflect.DeepEqual(obj, mutated) {
return mutated, nil return mutated, nil
} else { } else {
return setter.Update(context.TODO(), mutated, metav1.UpdateOptions{}) return setter.Update(ctx, mutated, metav1.UpdateOptions{})
} }
} }
} }
func Cleanup[T any, R Object[T]](actual []R, expected []R, deleter Deleter) error { func Cleanup[T any, R Object[T]](ctx context.Context, actual []R, expected []R, deleter Deleter) error {
keep := sets.NewString() keep := sets.NewString()
for _, obj := range expected { for _, obj := range expected {
keep.Insert(obj.GetName()) keep.Insert(obj.GetName())
} }
for _, obj := range actual { for _, obj := range actual {
if !keep.Has(obj.GetName()) { if !keep.Has(obj.GetName()) {
if err := deleter.Delete(context.TODO(), obj.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { if err := deleter.Delete(ctx, obj.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err return err
} }
} }

View file

@ -10,25 +10,25 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func CreateReport(client versioned.Interface, report kyvernov1alpha2.ReportInterface) (kyvernov1alpha2.ReportInterface, error) { func CreateReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) {
switch v := report.(type) { switch v := report.(type) {
case *kyvernov1alpha2.AdmissionReport: case *kyvernov1alpha2.AdmissionReport:
report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
case *kyvernov1alpha2.ClusterAdmissionReport: case *kyvernov1alpha2.ClusterAdmissionReport:
report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
case *kyvernov1alpha2.BackgroundScanReport: case *kyvernov1alpha2.BackgroundScanReport:
report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
case *kyvernov1alpha2.ClusterBackgroundScanReport: case *kyvernov1alpha2.ClusterBackgroundScanReport:
report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
case *policyreportv1alpha2.PolicyReport: case *policyreportv1alpha2.PolicyReport:
report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
case *policyreportv1alpha2.ClusterPolicyReport: case *policyreportv1alpha2.ClusterPolicyReport:
report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Create(context.TODO(), v, metav1.CreateOptions{}) report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Create(ctx, v, metav1.CreateOptions{})
return report, err return report, err
default: default:
return nil, errors.New("unknow type") return nil, errors.New("unknow type")

View file

@ -10,20 +10,20 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func DeleteReport(report kyvernov1alpha2.ReportInterface, client versioned.Interface) error { func DeleteReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) error {
switch v := report.(type) { switch v := report.(type) {
case *kyvernov1alpha2.AdmissionReport: case *kyvernov1alpha2.AdmissionReport:
return client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{})
case *kyvernov1alpha2.ClusterAdmissionReport: case *kyvernov1alpha2.ClusterAdmissionReport:
return client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.KyvernoV1alpha2().ClusterAdmissionReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{})
case *kyvernov1alpha2.BackgroundScanReport: case *kyvernov1alpha2.BackgroundScanReport:
return client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{})
case *kyvernov1alpha2.ClusterBackgroundScanReport: case *kyvernov1alpha2.ClusterBackgroundScanReport:
return client.KyvernoV1alpha2().ClusterBackgroundScanReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.KyvernoV1alpha2().ClusterBackgroundScanReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{})
case *policyreportv1alpha2.PolicyReport: case *policyreportv1alpha2.PolicyReport:
return client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Delete(ctx, v.GetName(), metav1.DeleteOptions{})
case *policyreportv1alpha2.ClusterPolicyReport: case *policyreportv1alpha2.ClusterPolicyReport:
return client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Delete(context.TODO(), v.GetName(), metav1.DeleteOptions{}) return client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Delete(ctx, v.GetName(), metav1.DeleteOptions{})
default: default:
return errors.New("unknow type") return errors.New("unknow type")
} }

View file

@ -10,25 +10,25 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func UpdateReport(report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) { func UpdateReport(ctx context.Context, report kyvernov1alpha2.ReportInterface, client versioned.Interface) (kyvernov1alpha2.ReportInterface, error) {
switch v := report.(type) { switch v := report.(type) {
case *kyvernov1alpha2.AdmissionReport: case *kyvernov1alpha2.AdmissionReport:
report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.KyvernoV1alpha2().AdmissionReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
case *kyvernov1alpha2.ClusterAdmissionReport: case *kyvernov1alpha2.ClusterAdmissionReport:
report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.KyvernoV1alpha2().ClusterAdmissionReports().Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
case *kyvernov1alpha2.BackgroundScanReport: case *kyvernov1alpha2.BackgroundScanReport:
report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.KyvernoV1alpha2().BackgroundScanReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
case *kyvernov1alpha2.ClusterBackgroundScanReport: case *kyvernov1alpha2.ClusterBackgroundScanReport:
report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.KyvernoV1alpha2().ClusterBackgroundScanReports().Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
case *policyreportv1alpha2.PolicyReport: case *policyreportv1alpha2.PolicyReport:
report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.Wgpolicyk8sV1alpha2().PolicyReports(report.GetNamespace()).Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
case *policyreportv1alpha2.ClusterPolicyReport: case *policyreportv1alpha2.ClusterPolicyReport:
report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), v, metav1.UpdateOptions{}) report, err := client.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(ctx, v, metav1.UpdateOptions{})
return report, err return report, err
default: default:
return nil, errors.New("unknow type") return nil, errors.New("unknow type")

View file

@ -1,6 +1,7 @@
package validation package validation
import ( import (
"context"
"reflect" "reflect"
"time" "time"
@ -177,7 +178,7 @@ func (v *validationHandler) handleAudit(
gv := metav1.GroupVersion{Group: request.Kind.Group, Version: request.Kind.Version} gv := metav1.GroupVersion{Group: request.Kind.Group, Version: request.Kind.Version}
controllerutils.SetOwner(report, gv.String(), request.Kind.Kind, resource.GetName(), resource.GetUID()) controllerutils.SetOwner(report, gv.String(), request.Kind.Kind, resource.GetName(), resource.GetUID())
} }
_, err = reportutils.CreateReport(v.kyvernoClient, report) _, err = reportutils.CreateReport(context.Background(), report, v.kyvernoClient)
if err != nil { if err != nil {
v.log.Error(err, "failed to create report") v.log.Error(err, "failed to create report")
} }