From fc453b1faa6b4f9576c6a4b7d7cc78d76b0b04f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Fri, 17 Mar 2023 11:48:48 +0100 Subject: [PATCH] fix: improve shutdown gracefulness (#5107) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: improve shutdown gracefulness Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché * fix Signed-off-by: Charles-Edouard Brétéché --------- Signed-off-by: Charles-Edouard Brétéché Co-authored-by: shuting --- cmd/background-controller/main.go | 5 +++- cmd/kyverno/main.go | 5 ++-- cmd/reports-controller/main.go | 4 ++- pkg/controllers/metrics/policy/controller.go | 31 +++++++++++++++----- pkg/event/controller.go | 22 +++++++------- pkg/utils/controller/run.go | 8 ++--- 6 files changed, 48 insertions(+), 27 deletions(-) diff --git a/cmd/background-controller/main.go b/cmd/background-controller/main.go index 95944d8dc8..88e09327ca 100644 --- a/cmd/background-controller/main.go +++ b/cmd/background-controller/main.go @@ -206,10 +206,12 @@ func main() { logging.WithName("EventGenerator"), ) // this controller only subscribe to events, nothing is returned... + var wg sync.WaitGroup policymetricscontroller.NewController( metricsConfig, kyvernoInformer.Kyverno().V1().ClusterPolicies(), kyvernoInformer.Kyverno().V1().Policies(), + &wg, ) engine := engine.NewEngine( configuration, @@ -225,7 +227,7 @@ func main() { os.Exit(1) } // start event generator - go eventGenerator.Run(signalCtx, 3) + go eventGenerator.Run(signalCtx, 3, &wg) // setup leader election le, err := leaderelection.New( logger.WithName("leader-election"), @@ -280,6 +282,7 @@ func main() { for { select { case <-signalCtx.Done(): + wg.Wait() return default: le.Run(signalCtx) diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 5cd13d710e..fd77c6d7bb 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -332,6 +332,7 @@ func main() { logger.Error(err, "Failed to create openapi manager") os.Exit(1) } + var wg sync.WaitGroup certRenewer := tls.NewCertRenewer( kubeClient.CoreV1().Secrets(config.KyvernoNamespace()), secretLister, @@ -353,6 +354,7 @@ func main() { metricsConfig, kyvernoInformer.Kyverno().V1().ClusterPolicies(), kyvernoInformer.Kyverno().V1().Policies(), + &wg, ) // log policy changes genericloggingcontroller.NewController( @@ -415,7 +417,7 @@ func main() { } } // start event generator - go eventGenerator.Run(signalCtx, 3) + go eventGenerator.Run(signalCtx, 3, &wg) // setup leader election le, err := leaderelection.New( logger.WithName("leader-election"), @@ -476,7 +478,6 @@ func main() { os.Exit(1) } // start non leader controllers - var wg sync.WaitGroup for _, controller := range nonLeaderControllers { controller.Run(signalCtx, logger.WithName("controllers"), &wg) } diff --git a/cmd/reports-controller/main.go b/cmd/reports-controller/main.go index 07d47ce37c..be007985b9 100644 --- a/cmd/reports-controller/main.go +++ b/cmd/reports-controller/main.go @@ -310,7 +310,8 @@ func main() { os.Exit(1) } // start event generator - go eventGenerator.Run(ctx, 3) + var wg sync.WaitGroup + go eventGenerator.Run(ctx, 3, &wg) eng := engine.NewEngine( configuration, dClient, @@ -386,6 +387,7 @@ func main() { for { select { case <-ctx.Done(): + wg.Wait() return default: le.Run(ctx) diff --git a/pkg/controllers/metrics/policy/controller.go b/pkg/controllers/metrics/policy/controller.go index 8a9eed1e8e..1e5ff42a52 100644 --- a/pkg/controllers/metrics/policy/controller.go +++ b/pkg/controllers/metrics/policy/controller.go @@ -2,6 +2,7 @@ package policy import ( "context" + "sync" kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" "github.com/kyverno/kyverno/pkg/autogen" @@ -25,10 +26,17 @@ type controller struct { // listers cpolLister kyvernov1listers.ClusterPolicyLister polLister kyvernov1listers.PolicyLister + + waitGroup *sync.WaitGroup } // TODO: this is a strange controller, it only processes events, this should be changed to a real controller. -func NewController(metricsConfig metrics.MetricsConfigManager, cpolInformer kyvernov1informers.ClusterPolicyInformer, polInformer kyvernov1informers.PolicyInformer) { +func NewController( + metricsConfig metrics.MetricsConfigManager, + cpolInformer kyvernov1informers.ClusterPolicyInformer, + polInformer kyvernov1informers.PolicyInformer, + waitGroup *sync.WaitGroup, +) { meterProvider := global.MeterProvider() meter := meterProvider.Meter(metrics.MeterName) policyRuleInfoMetric, err := meter.Float64ObservableGauge( @@ -43,6 +51,7 @@ func NewController(metricsConfig metrics.MetricsConfigManager, cpolInformer kyve ruleInfo: policyRuleInfoMetric, cpolLister: cpolInformer.Lister(), polLister: polInformer.Lister(), + waitGroup: waitGroup, } controllerutils.AddEventHandlers(cpolInformer.Informer(), c.addPolicy, c.updatePolicy, c.deletePolicy) controllerutils.AddEventHandlers(polInformer.Informer(), c.addNsPolicy, c.updateNsPolicy, c.deleteNsPolicy) @@ -111,16 +120,24 @@ func (c *controller) reportPolicy(ctx context.Context, policy kyvernov1.PolicyIn return nil } +func (c *controller) startRountine(routine func()) { + c.waitGroup.Add(1) + go func() { + defer c.waitGroup.Done() + routine() + }() +} + func (c *controller) addPolicy(obj interface{}) { p := obj.(*kyvernov1.ClusterPolicy) // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p) + c.startRountine(func() { c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p) }) } func (c *controller) updatePolicy(old, cur interface{}) { oldP, curP := old.(*kyvernov1.ClusterPolicy), cur.(*kyvernov1.ClusterPolicy) // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP) + c.startRountine(func() { c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP) }) } func (c *controller) deletePolicy(obj interface{}) { @@ -130,19 +147,19 @@ func (c *controller) deletePolicy(obj interface{}) { return } // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p) + c.startRountine(func() { c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p) }) } func (c *controller) addNsPolicy(obj interface{}) { p := obj.(*kyvernov1.Policy) // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p) + c.startRountine(func() { c.registerPolicyChangesMetricAddPolicy(context.TODO(), logger, p) }) } func (c *controller) updateNsPolicy(old, cur interface{}) { oldP, curP := old.(*kyvernov1.Policy), cur.(*kyvernov1.Policy) // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP) + c.startRountine(func() { c.registerPolicyChangesMetricUpdatePolicy(context.TODO(), logger, oldP, curP) }) } func (c *controller) deleteNsPolicy(obj interface{}) { @@ -152,5 +169,5 @@ func (c *controller) deleteNsPolicy(obj interface{}) { return } // register kyverno_policy_changes_total metric concurrently - go c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p) + c.startRountine(func() { c.registerPolicyChangesMetricDeletePolicy(context.TODO(), logger, p) }) } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index 8844ca6650..ca75ed2a67 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -2,13 +2,13 @@ package event import ( "context" + "sync" "time" "github.com/go-logr/logr" kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1" "github.com/kyverno/kyverno/pkg/clients/dclient" - "github.com/kyverno/kyverno/pkg/controllers" corev1 "k8s.io/api/core/v1" errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -48,8 +48,8 @@ type generator struct { // Controller interface to generate event type Controller interface { - controllers.Controller Interface + Run(context.Context, int, *sync.WaitGroup) } // Interface to generate event @@ -84,13 +84,11 @@ func NewEventGenerator( // Add queues an event for generation func (gen *generator) Add(infos ...Info) { logger := gen.log - logger.V(3).Info("generating events", "count", len(infos)) if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents { logger.V(2).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len()) return } - for _, info := range infos { if info.Name == "" { // dont create event for resources with generateName @@ -103,15 +101,18 @@ func (gen *generator) Add(infos ...Info) { } // Run begins generator -func (gen *generator) Run(ctx context.Context, workers int) { +func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) { logger := gen.log - defer utilruntime.HandleCrash() - logger.Info("start") defer logger.Info("shutting down") - + defer utilruntime.HandleCrash() + defer gen.queue.ShutDown() for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, gen.runWorker, time.Second) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + wait.UntilWithContext(ctx, gen.runWorker, time.Second) + }() } <-ctx.Done() } @@ -135,7 +136,6 @@ func (gen *generator) handleErr(err error, key interface{}) { gen.queue.AddRateLimited(key) return } - gen.queue.Forget(key) if !errors.IsNotFound(err) { logger.Error(err, "failed to generate event", "key", key) @@ -147,7 +147,6 @@ func (gen *generator) processNextWorkItem() bool { if shutdown { return false } - defer gen.queue.Done(obj) var key Info var ok bool @@ -158,7 +157,6 @@ func (gen *generator) processNextWorkItem() bool { } err := gen.syncHandler(key) gen.handleErr(err, obj) - return true } diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index 133a68bd65..a2654d3744 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -56,9 +56,10 @@ func newControllerMetrics(logger logr.Logger, controllerName string) *controller func Run(ctx context.Context, logger logr.Logger, controllerName string, period time.Duration, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, routines ...func(context.Context, logr.Logger)) { logger.Info("starting ...") - defer runtime.HandleCrash() defer logger.Info("stopped") var wg sync.WaitGroup + defer wg.Wait() + defer runtime.HandleCrash() metric := newControllerMetrics(logger, controllerName) func() { ctx, cancel := context.WithCancel(ctx) @@ -68,8 +69,8 @@ func Run(ctx context.Context, logger logr.Logger, controllerName string, period wg.Add(1) go func(logger logr.Logger) { logger.Info("starting worker") - defer wg.Done() defer logger.Info("worker stopped") + defer wg.Done() wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, metric, queue, maxRetries, r) }, period) }(logger.WithName("worker").WithValues("id", i)) } @@ -77,15 +78,14 @@ func Run(ctx context.Context, logger logr.Logger, controllerName string, period wg.Add(1) go func(logger logr.Logger, routine func(context.Context, logr.Logger)) { logger.Info("starting routine") - defer wg.Done() defer logger.Info("routine stopped") + defer wg.Done() routine(ctx, logger) }(logger.WithName("routine").WithValues("id", i), routine) } <-ctx.Done() }() logger.Info("waiting for workers to terminate ...") - wg.Wait() } func worker(ctx context.Context, logger logr.Logger, metric *controllerMetrics, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) {