1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 02:18:15 +00:00

Wait for informers' cache to be synced before starting controllers (#4155)

Signed-off-by: ShutingZhao <shuting@nirmata.com>
This commit is contained in:
shuting 2022-06-28 12:55:52 +08:00 committed by GitHub
parent 47b1266503
commit cd2d89bf55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 88 additions and 11 deletions

View file

@ -44,6 +44,7 @@ import (
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"sigs.k8s.io/controller-runtime/pkg/log"
@ -365,7 +366,11 @@ func main() {
setupLog.Error(err, "tls initialization error")
os.Exit(1)
}
waitForCacheSync(stopCh, kyvernoInformer, kubeInformer, kubeKyvernoInformer)
// wait for cache to be synced before use it
cache.WaitForCacheSync(stopCh,
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced,
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced,
)
// validate the ConfigMap format
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil {

View file

@ -53,6 +53,8 @@ type controller struct {
urLister kyvernov1beta1listers.UpdateRequestNamespaceLister
nsLister corev1listers.NamespaceLister
informersSynced []cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
}
@ -67,7 +69,7 @@ func NewController(
urInformer kyvernov1beta1informers.UpdateRequestInformer,
namespaceInformer corev1informers.NamespaceInformer,
) Controller {
return &controller{
c := &controller{
client: client,
kyvernoClient: kyvernoclient,
pInformer: pInformer,
@ -78,6 +80,9 @@ func NewController(
nsLister: namespaceInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request-cleanup"),
}
c.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced}
return c
}
func (c *controller) Run(workers int, stopCh <-chan struct{}) {
@ -85,6 +90,11 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
defer c.queue.ShutDown()
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("generate-request-cleanup", stopCh, c.informersSynced...) {
return
}
c.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.deletePolicy, // we only cleanup if the policy is delete
})

View file

@ -53,6 +53,8 @@ type controller struct {
nsLister corev1listers.NamespaceLister
podLister corev1listers.PodLister
informersSynced []cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
@ -82,7 +84,7 @@ func NewController(
urLister: urLister,
nsLister: namespaceInformer.Lister(),
podLister: podInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "update-request"),
eventGen: eventGen,
configuration: dynamicConfig,
}
@ -99,6 +101,9 @@ func NewController(
UpdateFunc: c.updatePolicy,
DeleteFunc: c.deletePolicy,
})
c.informersSynced = []cache.InformerSynced{cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced, podInformer.Informer().HasSynced}
return &c
}
@ -109,6 +114,10 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("background", stopCh, c.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}

View file

@ -23,8 +23,10 @@ type Controller interface {
}
type controller struct {
renewer *tls.CertRenewer
secretLister corev1listers.SecretLister
renewer *tls.CertRenewer
secretLister corev1listers.SecretLister
// secretSynced returns true if the secret shared informer has synced at least once
secretSynced cache.InformerSynced
secretQueue chan bool
onSecretChanged func() error
}
@ -33,6 +35,7 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer *t
manager := &controller{
renewer: certRenewer,
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
secretQueue: make(chan bool, 1),
onSecretChanged: onSecretChanged,
}

View file

@ -6,6 +6,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -20,6 +21,9 @@ type controller struct {
// listers
configmapLister corev1listers.ConfigMapLister
// configmapSynced returns true if the configmap shared informer has synced at least once
configmapSynced cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
}
@ -30,12 +34,14 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
configmapLister: configmapInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "config-controller"),
}
c.configmapSynced = configmapInformer.Informer().HasSynced
controllerutils.AddDefaultEventHandlers(logger, configmapInformer.Informer(), c.queue)
return &c
}
func (c *controller) Run(stopCh <-chan struct{}) {
controllerutils.Run(logger, c.queue, workers, maxRetries, c.reconcile, stopCh)
controllerutils.Run(controllerName, logger, c.queue, workers, maxRetries, c.reconcile, stopCh, c.configmapSynced)
}
func (c *controller) reconcile(key, namespace, name string) error {

View file

@ -2,4 +2,5 @@ package config
import "sigs.k8s.io/controller-runtime/pkg/log"
var logger = log.Log.WithName("config-controller")
var controllerName = "config-controller"
var logger = log.Log.WithName(controllerName)

View file

@ -25,6 +25,11 @@ type controller struct {
cpolLister kyvernov1listers.ClusterPolicyLister
polLister kyvernov1listers.PolicyLister
// cpolSynced returns true if the cluster policy shared informer has synced at least once
cpolSynced cache.InformerSynced
// polSynced returns true if the policy shared informer has synced at least once
polSynced cache.InformerSynced
// queue
queue workqueue.RateLimitingInterface
}
@ -34,6 +39,8 @@ func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterP
cache: pcache,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
cpolSynced: cpolInformer.Informer().HasSynced,
polSynced: polInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policycache-controller"),
}
controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue)
@ -44,6 +51,7 @@ func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterP
func (c *controller) WarmUp() error {
logger.Info("warming up ...")
defer logger.Info("warm up done")
pols, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything())
if err != nil {
return err
@ -70,7 +78,7 @@ func (c *controller) WarmUp() error {
}
func (c *controller) Run(stopCh <-chan struct{}) {
controllerutils.Run(logger, c.queue, workers, maxRetries, c.reconcile, stopCh)
controllerutils.Run("policycache-controller", logger, c.queue, workers, maxRetries, c.reconcile, stopCh, c.cpolSynced, c.polSynced)
}
func (c *controller) reconcile(key, namespace, name string) error {

View file

@ -78,6 +78,8 @@ type PolicyController struct {
// nsLister can list/get namespaces from the shared informer's store
nsLister corev1listers.NamespaceLister
informersSynced []cache.InformerSynced
// Resource manager, manages the mapping for already processed resource
rm resourceManager
@ -140,10 +142,10 @@ func NewPolicyController(
pc.pLister = pInformer.Lister()
pc.npLister = npInformer.Lister()
pc.nsLister = namespaces.Lister()
pc.urLister = urInformer.Lister()
pc.informersSynced = []cache.InformerSynced{pInformer.Informer().HasSynced, npInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaces.Informer().HasSynced}
// resource manager
// rebuild after 300 seconds/ 5 mins
pc.rm = NewResourceManager(30)
@ -414,6 +416,10 @@ func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, stopCh <-c
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("PolicyController", stopCh, pc.informersSynced...) {
return
}
pc.pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,

View file

@ -64,6 +64,8 @@ type ReportGenerator struct {
clusterReportChangeRequestLister kyvernov1alpha2listers.ClusterReportChangeRequestLister
nsLister corev1listers.NamespaceLister
informersSynced []cache.InformerSynced
queue workqueue.RateLimitingInterface
// ReconcileCh sends a signal to policy controller to force the reconciliation of policy report
@ -102,6 +104,7 @@ func NewReportGenerator(
gen.reportChangeRequestLister = reportReqInformer.Lister()
gen.nsLister = namespace.Lister()
gen.informersSynced = []cache.InformerSynced{clusterReportInformer.Informer().HasSynced, reportInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, clusterReportInformer.Informer().HasSynced, namespace.Informer().HasSynced}
return gen, nil
}
@ -231,6 +234,10 @@ func (g *ReportGenerator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("PolicyReportGenerator", stopCh, g.informersSynced...) {
return
}
g.reportReqInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: g.addReportChangeRequest,

View file

@ -19,6 +19,7 @@ import (
"github.com/kyverno/kyverno/pkg/engine/response"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -41,6 +42,8 @@ type Generator struct {
// polLister can list/get namespace policy from the shared informer's store
polLister kyvernov1listers.PolicyLister
informersSynced []cache.InformerSynced
queue workqueue.RateLimitingInterface
dataStore *dataStore
@ -70,6 +73,7 @@ func NewReportChangeRequestGenerator(client kyvernoclient.Interface,
log: log,
}
gen.informersSynced = []cache.InformerSynced{clusterReportReqInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced}
return &gen
}
@ -164,6 +168,10 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("requestCreator", stopCh, gen.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -13,10 +13,15 @@ import (
type reconcileFunc func(string, string, string) error
func Run(logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}) {
func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) {
defer runtime.HandleCrash()
logger.Info("starting ...")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}
for i := 0; i < n; i++ {
go wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, stopCh)
}

View file

@ -24,6 +24,7 @@ import (
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -52,6 +53,8 @@ type auditHandler struct {
crbLister rbacv1listers.ClusterRoleBindingLister
nsLister corev1listers.NamespaceLister
informersSynced []cache.InformerSynced
log logr.Logger
configHandler config.Configuration
promConfig *metrics.PromConfig
@ -69,7 +72,7 @@ func NewValidateAuditHandler(pCache policycache.Cache,
client dclient.Interface,
promConfig *metrics.PromConfig,
) AuditHandler {
return &auditHandler{
c := &auditHandler{
pCache: pCache,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
eventGen: eventGen,
@ -82,6 +85,8 @@ func NewValidateAuditHandler(pCache policycache.Cache,
client: client,
promConfig: promConfig,
}
c.informersSynced = []cache.InformerSynced{rbInformer.Informer().HasSynced, crbInformer.Informer().HasSynced, namespaces.Informer().HasSynced}
return c
}
func (h *auditHandler) Add(request *admissionv1.AdmissionRequest) {
@ -97,6 +102,10 @@ func (h *auditHandler) Run(workers int, stopCh <-chan struct{}) {
h.log.V(4).Info("shutting down")
}()
if !cache.WaitForNamedCacheSync("ValidateAuditHandler", stopCh, h.informersSynced...) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(h.runWorker, time.Second, stopCh)
}