From 209bab205910006adb648f85d3d42aedf4d92c2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= <charled.breteche@gmail.com> Date: Mon, 3 Oct 2022 11:19:01 +0200 Subject: [PATCH] refactor: more context less chans (#4764) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com> Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com> --- cmd/kyverno/informer.go | 43 +++++++++++++++++---- cmd/kyverno/main.go | 43 ++++++++++++--------- pkg/background/update_request_controller.go | 12 +++--- pkg/event/controller.go | 9 +++-- pkg/policy/policy_controller.go | 12 +++--- pkg/policy/report.go | 5 ++- pkg/webhookconfig/configmanager.go | 4 +- pkg/webhookconfig/monitor.go | 4 +- pkg/webhookconfig/registration.go | 20 ++++++++-- 9 files changed, 101 insertions(+), 51 deletions(-) diff --git a/cmd/kyverno/informer.go b/cmd/kyverno/informer.go index f8c2073048..9aeda020a3 100644 --- a/cmd/kyverno/informer.go +++ b/cmd/kyverno/informer.go @@ -1,28 +1,55 @@ package main import ( + "context" "reflect" + + "k8s.io/client-go/tools/cache" ) // TODO: eventually move this in an util package -type informer interface { +type startable interface { Start(stopCh <-chan struct{}) +} + +type informer interface { + startable WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool } -func startInformers(stopCh <-chan struct{}, informers ...informer) { +func startInformers[T startable](ctx context.Context, informers ...T) { for i := range informers { - informers[i].Start(stopCh) + informers[i].Start(ctx.Done()) } } -func waitForCacheSync(stopCh <-chan struct{}, informers ...informer) { +func waitForCacheSync(ctx context.Context, informers ...informer) bool { + ret := true for i := range informers { - informers[i].WaitForCacheSync(stopCh) + for _, result := range informers[i].WaitForCacheSync(ctx.Done()) { + ret = ret && result + } } + return ret } -func startInformersAndWaitForCacheSync(stopCh <-chan struct{}, informers ...informer) { - startInformers(stopCh, informers...) - waitForCacheSync(stopCh, informers...) +func checkCacheSync[T comparable](status map[T]bool) bool { + ret := true + for _, s := range status { + ret = ret && s + } + return ret +} + +func startInformersAndWaitForCacheSync(ctx context.Context, informers ...informer) bool { + startInformers(ctx, informers...) + return waitForCacheSync(ctx, informers...) +} + +func waitForInformersCacheSync(ctx context.Context, informers ...cache.SharedInformer) bool { + var hasSynced []cache.InformerSynced + for i := range informers { + hasSynced = append(hasSynced, informers[i].HasSynced) + } + return cache.WaitForCacheSync(ctx.Done(), hasSynced...) } diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 16565afd68..c837318c93 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -52,7 +52,6 @@ import ( "k8s.io/client-go/kubernetes" metadataclient "k8s.io/client-go/metadata" metadatainformers "k8s.io/client-go/metadata/metadatainformer" - "k8s.io/client-go/tools/cache" ) const ( @@ -156,7 +155,6 @@ func main() { signalCtx, signalCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer signalCancel() - stopCh := signalCtx.Done() debug := serverIP != "" // clients @@ -296,6 +294,7 @@ func main() { eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, logging.WithName("EventGenerator")) webhookCfg := webhookconfig.NewRegister( + signalCtx, clientConfig, dynamicClient, kubeClient, @@ -310,7 +309,6 @@ func main() { int32(webhookTimeout), debug, autoUpdateWebhooks, - stopCh, logging.GlobalLogger(), ) @@ -442,10 +440,13 @@ func main() { os.Exit(1) } // wait for cache to be synced before use it - cache.WaitForCacheSync(stopCh, - kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced, - kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced, - ) + if !waitForInformersCacheSync(signalCtx, + kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), + kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), + ) { + // TODO: shall we just exit ? + logger.Info("failed to wait for cache sync") + } // validate the ConfigMap format if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil { @@ -461,7 +462,7 @@ func main() { } webhookCfg.UpdateWebhookChan <- true go certManager.Run(signalCtx, certmanager.Workers) - go policyCtrl.Run(2, stopCh) + go policyCtrl.Run(signalCtx, 2) reportControllers := setupReportControllers( backgroundScan, @@ -472,9 +473,11 @@ func main() { kubeInformer, kyvernoInformer, ) - - metadataInformer.Start(stopCh) - metadataInformer.WaitForCacheSync(stopCh) + startInformers(signalCtx, metadataInformer) + if !checkCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) { + // TODO: shall we just exit ? + logger.Info("failed to wait for cache sync") + } for _, controller := range reportControllers { go controller.run(signalCtx) @@ -499,10 +502,13 @@ func main() { // cancel leader election context on shutdown signals go func() { defer signalCancel() - <-stopCh + <-signalCtx.Done() }() - startInformersAndWaitForCacheSync(stopCh, kyvernoInformer, kubeInformer, kubeKyvernoInformer) + if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) { + logger.Error(err, "Failed to wait for cache sync") + os.Exit(1) + } // warmup policy cache if err := policyCacheController.WarmUp(); err != nil { @@ -513,18 +519,19 @@ func main() { // init events handlers // start Kyverno controllers go policyCacheController.Run(signalCtx, policycachecontroller.Workers) - go urc.Run(genWorkers, stopCh) + go urc.Run(signalCtx, genWorkers) go le.Run(signalCtx) go configurationController.Run(signalCtx, configcontroller.Workers) - go eventGenerator.Run(3, stopCh) + go eventGenerator.Run(signalCtx, 3) + if !debug { - go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh) + go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator) } // verifies if the admission control is enabled and active - server.Run(stopCh) + server.Run(signalCtx.Done()) - <-stopCh + <-signalCtx.Done() // resource cleanup // remove webhook configurations diff --git a/pkg/background/update_request_controller.go b/pkg/background/update_request_controller.go index d6948326f8..735c616fbd 100644 --- a/pkg/background/update_request_controller.go +++ b/pkg/background/update_request_controller.go @@ -38,7 +38,7 @@ const ( type Controller interface { // Run starts workers - Run(int, <-chan struct{}) + Run(context.Context, int) } // controller manages the life-cycle for Generate-Requests and applies generate rule @@ -107,27 +107,27 @@ func NewController( return &c } -func (c *controller) Run(workers int, stopCh <-chan struct{}) { +func (c *controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer c.queue.ShutDown() logger.Info("starting") defer logger.Info("shutting down") - if !cache.WaitForNamedCacheSync("background", stopCh, c.informersSynced...) { + if !cache.WaitForNamedCacheSync("background", ctx.Done(), c.informersSynced...) { return } for i := 0; i < workers; i++ { - go wait.Until(c.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.worker, time.Second) } - <-stopCh + <-ctx.Done() } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *controller) worker() { +func (c *controller) worker(ctx context.Context) { for c.processNextWorkItem() { } } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index 5e8bc253a1..cebf7a496d 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -1,6 +1,7 @@ package event import ( + "context" "time" "github.com/go-logr/logr" @@ -115,7 +116,7 @@ func (gen *Generator) Add(infos ...Info) { } // Run begins generator -func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { +func (gen *Generator) Run(ctx context.Context, workers int) { logger := gen.log defer utilruntime.HandleCrash() @@ -123,12 +124,12 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { defer logger.Info("shutting down") for i := 0; i < workers; i++ { - go wait.Until(gen.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, gen.runWorker, time.Second) } - <-stopCh + <-ctx.Done() } -func (gen *Generator) runWorker() { +func (gen *Generator) runWorker(ctx context.Context) { for gen.processNextWorkItem() { } } diff --git a/pkg/policy/policy_controller.go b/pkg/policy/policy_controller.go index d6e53750a3..d5a7c32757 100644 --- a/pkg/policy/policy_controller.go +++ b/pkg/policy/policy_controller.go @@ -346,7 +346,7 @@ func (pc *PolicyController) enqueuePolicy(policy kyvernov1.PolicyInterface) { } // Run begins watching and syncing. -func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cleanupChangeRequest <-chan policyreport.ReconcileInfo*/, stopCh <-chan struct{}) { +func (pc *PolicyController) Run(ctx context.Context, workers int) { logger := pc.log defer utilruntime.HandleCrash() @@ -355,7 +355,7 @@ func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cl logger.Info("starting") defer logger.Info("shutting down") - if !cache.WaitForNamedCacheSync("PolicyController", stopCh, pc.informersSynced...) { + if !cache.WaitForNamedCacheSync("PolicyController", ctx.Done(), pc.informersSynced...) { return } @@ -372,17 +372,17 @@ func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cl }) for i := 0; i < workers; i++ { - go wait.Until(pc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, pc.worker, time.Second) } - go pc.forceReconciliation( /*reconcileCh, */ /* cleanupChangeRequest,*/ stopCh) + go pc.forceReconciliation(ctx) - <-stopCh + <-ctx.Done() } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (pc *PolicyController) worker() { +func (pc *PolicyController) worker(ctx context.Context) { for pc.processNextWorkItem() { } } diff --git a/pkg/policy/report.go b/pkg/policy/report.go index f74972cfbd..4731752b69 100644 --- a/pkg/policy/report.go +++ b/pkg/policy/report.go @@ -1,6 +1,7 @@ package policy import ( + "context" "time" "github.com/go-logr/logr" @@ -21,7 +22,7 @@ func (pc *PolicyController) report(engineResponses []*response.EngineResponse, l } // forceReconciliation forces a background scan by adding all policies to the workqueue -func (pc *PolicyController) forceReconciliation(stopCh <-chan struct{}) { +func (pc *PolicyController) forceReconciliation(ctx context.Context) { logger := pc.log.WithName("forceReconciliation") ticker := time.NewTicker(pc.reconcilePeriod) @@ -31,7 +32,7 @@ func (pc *PolicyController) forceReconciliation(stopCh <-chan struct{}) { logger.Info("performing the background scan", "scan interval", pc.reconcilePeriod.String()) pc.requeuePolicies() - case <-stopCh: + case <-ctx.Done(): return } } diff --git a/pkg/webhookconfig/configmanager.go b/pkg/webhookconfig/configmanager.go index e2fa3fb9f6..470257b88b 100644 --- a/pkg/webhookconfig/configmanager.go +++ b/pkg/webhookconfig/configmanager.go @@ -80,6 +80,7 @@ type manage interface { } func newWebhookConfigManager( + ctx context.Context, discoveryClient dclient.IDiscovery, kubeClient kubernetes.Interface, kyvernoClient versioned.Interface, @@ -91,7 +92,6 @@ func newWebhookConfigManager( serverIP string, autoUpdateWebhooks bool, createDefaultWebhook chan<- string, - stopCh <-chan struct{}, log logr.Logger, ) manage { m := &webhookConfigManager{ @@ -112,7 +112,7 @@ func newWebhookConfigManager( serverIP: serverIP, autoUpdateWebhooks: autoUpdateWebhooks, createDefaultWebhook: createDefaultWebhook, - stopCh: stopCh, + stopCh: ctx.Done(), log: log, } diff --git a/pkg/webhookconfig/monitor.go b/pkg/webhookconfig/monitor.go index 1444719c27..baf0b50f84 100644 --- a/pkg/webhookconfig/monitor.go +++ b/pkg/webhookconfig/monitor.go @@ -78,7 +78,7 @@ func (t *Monitor) SetTime(tm time.Time) { } // Run runs the checker and verify the resource update -func (t *Monitor) Run(register *Register, certRenewer *tls.CertRenewer, eventGen event.Interface, stopCh <-chan struct{}) { +func (t *Monitor) Run(ctx context.Context, register *Register, certRenewer *tls.CertRenewer, eventGen event.Interface) { logger := t.log.WithName("webhookMonitor") logger.V(3).Info("starting webhook monitor", "interval", idleCheckInterval.String()) @@ -178,7 +178,7 @@ func (t *Monitor) Run(register *Register, certRenewer *tls.CertRenewer, eventGen logger.Error(err, "failed to annotate deployment webhook status to success") } - case <-stopCh: + case <-ctx.Done(): // handler termination signal logger.V(2).Info("stopping webhook monitor") return diff --git a/pkg/webhookconfig/registration.go b/pkg/webhookconfig/registration.go index 13c807a0a6..ed50b33dad 100644 --- a/pkg/webhookconfig/registration.go +++ b/pkg/webhookconfig/registration.go @@ -71,6 +71,7 @@ type Register struct { // NewRegister creates new Register instance func NewRegister( + ctx context.Context, clientConfig *rest.Config, client dclient.Interface, kubeClient kubernetes.Interface, @@ -85,7 +86,6 @@ func NewRegister( webhookTimeout int32, debug bool, autoUpdateWebhooks bool, - stopCh <-chan struct{}, log logr.Logger, ) *Register { register := &Register{ @@ -98,7 +98,7 @@ func NewRegister( metricsConfig: metricsConfig, UpdateWebhookChan: make(chan bool), createDefaultWebhook: make(chan string), - stopCh: stopCh, + stopCh: ctx.Done(), serverIP: serverIP, timeoutSeconds: webhookTimeout, log: log.WithName("Register"), @@ -106,7 +106,21 @@ func NewRegister( autoUpdateWebhooks: autoUpdateWebhooks, } - register.manage = newWebhookConfigManager(client.Discovery(), kubeClient, kyvernoClient, pInformer, npInformer, mwcInformer, vwcInformer, metricsConfig, serverIP, register.autoUpdateWebhooks, register.createDefaultWebhook, stopCh, log.WithName("WebhookConfigManager")) + register.manage = newWebhookConfigManager( + ctx, + client.Discovery(), + kubeClient, + kyvernoClient, + pInformer, + npInformer, + mwcInformer, + vwcInformer, + metricsConfig, + serverIP, + register.autoUpdateWebhooks, + register.createDefaultWebhook, + log.WithName("WebhookConfigManager"), + ) return register }