diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 112fcbae26..5c7adf2998 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -44,6 +44,7 @@ import ( kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" "sigs.k8s.io/controller-runtime/pkg/log" @@ -365,7 +366,11 @@ func main() { setupLog.Error(err, "tls initialization error") os.Exit(1) } - waitForCacheSync(stopCh, kyvernoInformer, kubeInformer, kubeKyvernoInformer) + // wait for cache to be synced before use it + cache.WaitForCacheSync(stopCh, + kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced, + kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced, + ) // validate the ConfigMap format if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil { diff --git a/pkg/background/generate/cleanup/controller.go b/pkg/background/generate/cleanup/controller.go index 7044ad3548..d19be4766c 100644 --- a/pkg/background/generate/cleanup/controller.go +++ b/pkg/background/generate/cleanup/controller.go @@ -53,6 +53,8 @@ type controller struct { urLister kyvernov1beta1listers.UpdateRequestNamespaceLister nsLister corev1listers.NamespaceLister + informersSynced []cache.InformerSynced + // queue queue workqueue.RateLimitingInterface } @@ -67,7 +69,7 @@ func NewController( urInformer kyvernov1beta1informers.UpdateRequestInformer, namespaceInformer corev1informers.NamespaceInformer, ) Controller { - return &controller{ + c := &controller{ client: client, kyvernoClient: kyvernoclient, pInformer: pInformer, @@ -78,6 +80,9 @@ func NewController( nsLister: namespaceInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request-cleanup"), } + + c.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced} + return c } func (c *controller) Run(workers int, stopCh <-chan struct{}) { @@ -85,6 +90,11 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) { defer c.queue.ShutDown() logger.Info("starting") defer logger.Info("shutting down") + + if !cache.WaitForNamedCacheSync("generate-request-cleanup", stopCh, c.informersSynced...) { + return + } + c.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: c.deletePolicy, // we only cleanup if the policy is delete }) diff --git a/pkg/background/update_request_controller.go b/pkg/background/update_request_controller.go index d1a924587d..20959fe324 100644 --- a/pkg/background/update_request_controller.go +++ b/pkg/background/update_request_controller.go @@ -53,6 +53,8 @@ type controller struct { nsLister corev1listers.NamespaceLister podLister corev1listers.PodLister + informersSynced []cache.InformerSynced + // queue queue workqueue.RateLimitingInterface @@ -82,7 +84,7 @@ func NewController( urLister: urLister, nsLister: namespaceInformer.Lister(), podLister: podInformer.Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "update-request"), eventGen: eventGen, configuration: dynamicConfig, } @@ -99,6 +101,9 @@ func NewController( UpdateFunc: c.updatePolicy, DeleteFunc: c.deletePolicy, }) + + c.informersSynced = []cache.InformerSynced{cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced, podInformer.Informer().HasSynced} + return &c } @@ -109,6 +114,10 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) { logger.Info("starting") defer logger.Info("shutting down") + if !cache.WaitForNamedCacheSync("background", stopCh, c.informersSynced...) { + return + } + for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } diff --git a/pkg/controllers/certmanager/controller.go b/pkg/controllers/certmanager/controller.go index 97fe23224f..9db0375c17 100644 --- a/pkg/controllers/certmanager/controller.go +++ b/pkg/controllers/certmanager/controller.go @@ -23,8 +23,10 @@ type Controller interface { } type controller struct { - renewer *tls.CertRenewer - secretLister corev1listers.SecretLister + renewer *tls.CertRenewer + secretLister corev1listers.SecretLister + // secretSynced returns true if the secret shared informer has synced at least once + secretSynced cache.InformerSynced secretQueue chan bool onSecretChanged func() error } @@ -33,6 +35,7 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer *t manager := &controller{ renewer: certRenewer, secretLister: secretInformer.Lister(), + secretSynced: secretInformer.Informer().HasSynced, secretQueue: make(chan bool, 1), onSecretChanged: onSecretChanged, } diff --git a/pkg/controllers/config/controller.go b/pkg/controllers/config/controller.go index 265b801309..c835b0b4ce 100644 --- a/pkg/controllers/config/controller.go +++ b/pkg/controllers/config/controller.go @@ -6,6 +6,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" corev1informers "k8s.io/client-go/informers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -20,6 +21,9 @@ type controller struct { // listers configmapLister corev1listers.ConfigMapLister + // configmapSynced returns true if the configmap shared informer has synced at least once + configmapSynced cache.InformerSynced + // queue queue workqueue.RateLimitingInterface } @@ -30,12 +34,14 @@ func NewController(configuration config.Configuration, configmapInformer corev1i configmapLister: configmapInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "config-controller"), } + + c.configmapSynced = configmapInformer.Informer().HasSynced controllerutils.AddDefaultEventHandlers(logger, configmapInformer.Informer(), c.queue) return &c } func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(logger, c.queue, workers, maxRetries, c.reconcile, stopCh) + controllerutils.Run(controllerName, logger, c.queue, workers, maxRetries, c.reconcile, stopCh, c.configmapSynced) } func (c *controller) reconcile(key, namespace, name string) error { diff --git a/pkg/controllers/config/log.go b/pkg/controllers/config/log.go index 666545c526..efb4311a63 100644 --- a/pkg/controllers/config/log.go +++ b/pkg/controllers/config/log.go @@ -2,4 +2,5 @@ package config import "sigs.k8s.io/controller-runtime/pkg/log" -var logger = log.Log.WithName("config-controller") +var controllerName = "config-controller" +var logger = log.Log.WithName(controllerName) diff --git a/pkg/controllers/policycache/controller.go b/pkg/controllers/policycache/controller.go index a8a72178d7..dd98b4f7e3 100644 --- a/pkg/controllers/policycache/controller.go +++ b/pkg/controllers/policycache/controller.go @@ -25,6 +25,11 @@ type controller struct { cpolLister kyvernov1listers.ClusterPolicyLister polLister kyvernov1listers.PolicyLister + // cpolSynced returns true if the cluster policy shared informer has synced at least once + cpolSynced cache.InformerSynced + // polSynced returns true if the policy shared informer has synced at least once + polSynced cache.InformerSynced + // queue queue workqueue.RateLimitingInterface } @@ -34,6 +39,8 @@ func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterP cache: pcache, cpolLister: cpolInformer.Lister(), polLister: polInformer.Lister(), + cpolSynced: cpolInformer.Informer().HasSynced, + polSynced: polInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policycache-controller"), } controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue) @@ -44,6 +51,7 @@ func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterP func (c *controller) WarmUp() error { logger.Info("warming up ...") defer logger.Info("warm up done") + pols, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything()) if err != nil { return err @@ -70,7 +78,7 @@ func (c *controller) WarmUp() error { } func (c *controller) Run(stopCh <-chan struct{}) { - controllerutils.Run(logger, c.queue, workers, maxRetries, c.reconcile, stopCh) + controllerutils.Run("policycache-controller", logger, c.queue, workers, maxRetries, c.reconcile, stopCh, c.cpolSynced, c.polSynced) } func (c *controller) reconcile(key, namespace, name string) error { diff --git a/pkg/policy/policy_controller.go b/pkg/policy/policy_controller.go index 4760f547bc..6013429052 100644 --- a/pkg/policy/policy_controller.go +++ b/pkg/policy/policy_controller.go @@ -78,6 +78,8 @@ type PolicyController struct { // nsLister can list/get namespaces from the shared informer's store nsLister corev1listers.NamespaceLister + informersSynced []cache.InformerSynced + // Resource manager, manages the mapping for already processed resource rm resourceManager @@ -140,10 +142,10 @@ func NewPolicyController( pc.pLister = pInformer.Lister() pc.npLister = npInformer.Lister() - pc.nsLister = namespaces.Lister() pc.urLister = urInformer.Lister() + pc.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaces.Informer().HasSynced} // resource manager // rebuild after 300 seconds/ 5 mins pc.rm = NewResourceManager(30) @@ -414,6 +416,10 @@ func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, stopCh <-c logger.Info("starting") defer logger.Info("shutting down") + if !cache.WaitForNamedCacheSync("PolicyController", stopCh, pc.informersSynced...) { + return + } + pc.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: pc.addPolicy, UpdateFunc: pc.updatePolicy, diff --git a/pkg/policyreport/reportcontroller.go b/pkg/policyreport/reportcontroller.go index aa75ccffed..8bf468e447 100644 --- a/pkg/policyreport/reportcontroller.go +++ b/pkg/policyreport/reportcontroller.go @@ -64,6 +64,8 @@ type ReportGenerator struct { clusterReportChangeRequestLister kyvernov1alpha2listers.ClusterReportChangeRequestLister nsLister corev1listers.NamespaceLister + informersSynced []cache.InformerSynced + queue workqueue.RateLimitingInterface // ReconcileCh sends a signal to policy controller to force the reconciliation of policy report @@ -102,6 +104,7 @@ func NewReportGenerator( gen.reportChangeRequestLister = reportReqInformer.Lister() gen.nsLister = namespace.Lister() + gen.informersSynced = []cache.InformerSynced{clusterReportInformer.Informer().HasSynced, reportInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, clusterReportInformer.Informer().HasSynced, namespace.Informer().HasSynced} return gen, nil } @@ -231,6 +234,10 @@ func (g *ReportGenerator) Run(workers int, stopCh <-chan struct{}) { logger.Info("start") defer logger.Info("shutting down") + if !cache.WaitForNamedCacheSync("PolicyReportGenerator", stopCh, g.informersSynced...) { + return + } + g.reportReqInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: g.addReportChangeRequest, diff --git a/pkg/policyreport/reportrequest.go b/pkg/policyreport/reportrequest.go index 686811506f..74fc53da52 100644 --- a/pkg/policyreport/reportrequest.go +++ b/pkg/policyreport/reportrequest.go @@ -19,6 +19,7 @@ import ( "github.com/kyverno/kyverno/pkg/engine/response" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -41,6 +42,8 @@ type Generator struct { // polLister can list/get namespace policy from the shared informer's store polLister kyvernov1listers.PolicyLister + informersSynced []cache.InformerSynced + queue workqueue.RateLimitingInterface dataStore *dataStore @@ -70,6 +73,7 @@ func NewReportChangeRequestGenerator(client kyvernoclient.Interface, log: log, } + gen.informersSynced = []cache.InformerSynced{clusterReportReqInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced} return &gen } @@ -164,6 +168,10 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { logger.Info("start") defer logger.Info("shutting down") + if !cache.WaitForNamedCacheSync("requestCreator", stopCh, gen.informersSynced...) { + return + } + for i := 0; i < workers; i++ { go wait.Until(gen.runWorker, time.Second, stopCh) } diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index ed1d665fb1..cc35a87da3 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -13,10 +13,15 @@ import ( type reconcileFunc func(string, string, string) error -func Run(logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}) { +func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) { defer runtime.HandleCrash() logger.Info("starting ...") defer logger.Info("shutting down") + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + return + } + for i := 0; i < n; i++ { go wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, stopCh) } diff --git a/pkg/webhooks/resource/validate_audit.go b/pkg/webhooks/resource/validate_audit.go index d07ac13762..31b7914e33 100644 --- a/pkg/webhooks/resource/validate_audit.go +++ b/pkg/webhooks/resource/validate_audit.go @@ -24,6 +24,7 @@ import ( rbacv1informers "k8s.io/client-go/informers/rbac/v1" corev1listers "k8s.io/client-go/listers/core/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -52,6 +53,8 @@ type auditHandler struct { crbLister rbacv1listers.ClusterRoleBindingLister nsLister corev1listers.NamespaceLister + informersSynced []cache.InformerSynced + log logr.Logger configHandler config.Configuration promConfig *metrics.PromConfig @@ -69,7 +72,7 @@ func NewValidateAuditHandler(pCache policycache.Cache, client dclient.Interface, promConfig *metrics.PromConfig, ) AuditHandler { - return &auditHandler{ + c := &auditHandler{ pCache: pCache, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), eventGen: eventGen, @@ -82,6 +85,8 @@ func NewValidateAuditHandler(pCache policycache.Cache, client: client, promConfig: promConfig, } + c.informersSynced = []cache.InformerSynced{rbInformer.Informer().HasSynced, crbInformer.Informer().HasSynced, namespaces.Informer().HasSynced} + return c } func (h *auditHandler) Add(request *admissionv1.AdmissionRequest) { @@ -97,6 +102,10 @@ func (h *auditHandler) Run(workers int, stopCh <-chan struct{}) { h.log.V(4).Info("shutting down") }() + if !cache.WaitForNamedCacheSync("ValidateAuditHandler", stopCh, h.informersSynced...) { + return + } + for i := 0; i < workers; i++ { go wait.Until(h.runWorker, time.Second, stopCh) }