1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

fix: shutdown controllers workers gracefully (#4681)

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
Co-authored-by: Vyankatesh Kudtarkar <vyankateshkd@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-09-26 17:24:57 +02:00 committed by GitHub
parent e305aea95c
commit 8741c34081
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,6 +1,8 @@
package controller
import (
"context"
"sync"
"time"
"github.com/go-logr/logr"
@ -14,18 +16,30 @@ import (
type reconcileFunc func(string, string, string) error
func Run(controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) {
defer runtime.HandleCrash()
logger.Info("starting ...")
defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}
for i := 0; i < n; i++ {
go wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, stopCh)
}
<-stopCh
defer runtime.HandleCrash()
defer logger.Info("stopped")
var wg sync.WaitGroup
func() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
defer queue.ShutDown()
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(logger logr.Logger) {
logger.Info("starting worker")
defer wg.Done()
defer logger.Info("worker stopped")
wait.Until(func() { worker(logger, queue, maxRetries, r) }, time.Second, ctx.Done())
}(logger.WithValues("id", i))
}
<-stopCh
}()
logger.Info("waiting for workers to terminate ...")
wg.Wait()
}
func worker(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) {