From 8741c340818703fa5ab67f3a3ffa63444cb00a65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Mon, 26 Sep 2022 17:24:57 +0200 Subject: [PATCH] fix: shutdown controllers workers gracefully (#4681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Charles-Edouard Brétéché Signed-off-by: Charles-Edouard Brétéché Co-authored-by: Vyankatesh Kudtarkar --- pkg/utils/controller/run.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index 8a54d4fe58..86e470a109 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -1,6 +1,8 @@ package controller import ( + "context" + "sync" "time" "github.com/go-logr/logr" @@ -14,18 +16,30 @@ import ( type reconcileFunc func(string, string, string) error func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) { - defer runtime.HandleCrash() logger.Info("starting ...") - defer logger.Info("shutting down") - - if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { - return - } - - for i := 0; i < n; i++ { - go wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, stopCh) - } - <-stopCh + defer runtime.HandleCrash() + defer logger.Info("stopped") + var wg sync.WaitGroup + func() { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + defer queue.ShutDown() + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + return + } + for i := 0; i < n; i++ { + wg.Add(1) + go func(logger logr.Logger) { + logger.Info("starting worker") + defer wg.Done() + defer logger.Info("worker stopped") + wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, ctx.Done()) + }(logger.WithValues("id", i)) + } + <-stopCh + }() + logger.Info("waiting for workers to terminate ...") + wg.Wait() } func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) {