1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

refactor: more context less chans (#4764)

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>

Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-10-03 11:19:01 +02:00 committed by GitHub
parent 44763cf61b
commit 209bab2059
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 101 additions and 51 deletions

View file

@ -1,28 +1,55 @@
package main package main
import ( import (
"context"
"reflect" "reflect"
"k8s.io/client-go/tools/cache"
) )
// TODO: eventually move this in an util package // TODO: eventually move this in an util package
type informer interface { type startable interface {
Start(stopCh <-chan struct{}) Start(stopCh <-chan struct{})
}
type informer interface {
startable
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
} }
func startInformers(stopCh <-chan struct{}, informers ...informer) { func startInformers[T startable](ctx context.Context, informers ...T) {
for i := range informers { for i := range informers {
informers[i].Start(stopCh) informers[i].Start(ctx.Done())
} }
} }
func waitForCacheSync(stopCh <-chan struct{}, informers ...informer) { func waitForCacheSync(ctx context.Context, informers ...informer) bool {
ret := true
for i := range informers { for i := range informers {
informers[i].WaitForCacheSync(stopCh) for _, result := range informers[i].WaitForCacheSync(ctx.Done()) {
ret = ret && result
}
} }
return ret
} }
func startInformersAndWaitForCacheSync(stopCh <-chan struct{}, informers ...informer) { func checkCacheSync[T comparable](status map[T]bool) bool {
startInformers(stopCh, informers...) ret := true
waitForCacheSync(stopCh, informers...) for _, s := range status {
ret = ret && s
}
return ret
}
func startInformersAndWaitForCacheSync(ctx context.Context, informers ...informer) bool {
startInformers(ctx, informers...)
return waitForCacheSync(ctx, informers...)
}
func waitForInformersCacheSync(ctx context.Context, informers ...cache.SharedInformer) bool {
var hasSynced []cache.InformerSynced
for i := range informers {
hasSynced = append(hasSynced, informers[i].HasSynced)
}
return cache.WaitForCacheSync(ctx.Done(), hasSynced...)
} }

View file

@ -52,7 +52,6 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
metadataclient "k8s.io/client-go/metadata" metadataclient "k8s.io/client-go/metadata"
metadatainformers "k8s.io/client-go/metadata/metadatainformer" metadatainformers "k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache"
) )
const ( const (
@ -156,7 +155,6 @@ func main() {
signalCtx, signalCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) signalCtx, signalCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer signalCancel() defer signalCancel()
stopCh := signalCtx.Done()
debug := serverIP != "" debug := serverIP != ""
// clients // clients
@ -296,6 +294,7 @@ func main() {
eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, logging.WithName("EventGenerator")) eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, logging.WithName("EventGenerator"))
webhookCfg := webhookconfig.NewRegister( webhookCfg := webhookconfig.NewRegister(
signalCtx,
clientConfig, clientConfig,
dynamicClient, dynamicClient,
kubeClient, kubeClient,
@ -310,7 +309,6 @@ func main() {
int32(webhookTimeout), int32(webhookTimeout),
debug, debug,
autoUpdateWebhooks, autoUpdateWebhooks,
stopCh,
logging.GlobalLogger(), logging.GlobalLogger(),
) )
@ -442,10 +440,13 @@ func main() {
os.Exit(1) os.Exit(1)
} }
// wait for cache to be synced before use it // wait for cache to be synced before use it
cache.WaitForCacheSync(stopCh, if !waitForInformersCacheSync(signalCtx,
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer().HasSynced, kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer().HasSynced, kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
) ) {
// TODO: shall we just exit ?
logger.Info("failed to wait for cache sync")
}
// validate the ConfigMap format // validate the ConfigMap format
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil { if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil {
@ -461,7 +462,7 @@ func main() {
} }
webhookCfg.UpdateWebhookChan <- true webhookCfg.UpdateWebhookChan <- true
go certManager.Run(signalCtx, certmanager.Workers) go certManager.Run(signalCtx, certmanager.Workers)
go policyCtrl.Run(2, stopCh) go policyCtrl.Run(signalCtx, 2)
reportControllers := setupReportControllers( reportControllers := setupReportControllers(
backgroundScan, backgroundScan,
@ -472,9 +473,11 @@ func main() {
kubeInformer, kubeInformer,
kyvernoInformer, kyvernoInformer,
) )
startInformers(signalCtx, metadataInformer)
metadataInformer.Start(stopCh) if !checkCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) {
metadataInformer.WaitForCacheSync(stopCh) // TODO: shall we just exit ?
logger.Info("failed to wait for cache sync")
}
for _, controller := range reportControllers { for _, controller := range reportControllers {
go controller.run(signalCtx) go controller.run(signalCtx)
@ -499,10 +502,13 @@ func main() {
// cancel leader election context on shutdown signals // cancel leader election context on shutdown signals
go func() { go func() {
defer signalCancel() defer signalCancel()
<-stopCh <-signalCtx.Done()
}() }()
startInformersAndWaitForCacheSync(stopCh, kyvernoInformer, kubeInformer, kubeKyvernoInformer) if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(err, "Failed to wait for cache sync")
os.Exit(1)
}
// warmup policy cache // warmup policy cache
if err := policyCacheController.WarmUp(); err != nil { if err := policyCacheController.WarmUp(); err != nil {
@ -513,18 +519,19 @@ func main() {
// init events handlers // init events handlers
// start Kyverno controllers // start Kyverno controllers
go policyCacheController.Run(signalCtx, policycachecontroller.Workers) go policyCacheController.Run(signalCtx, policycachecontroller.Workers)
go urc.Run(genWorkers, stopCh) go urc.Run(signalCtx, genWorkers)
go le.Run(signalCtx) go le.Run(signalCtx)
go configurationController.Run(signalCtx, configcontroller.Workers) go configurationController.Run(signalCtx, configcontroller.Workers)
go eventGenerator.Run(3, stopCh) go eventGenerator.Run(signalCtx, 3)
if !debug { if !debug {
go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh) go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator)
} }
// verifies if the admission control is enabled and active // verifies if the admission control is enabled and active
server.Run(stopCh) server.Run(signalCtx.Done())
<-stopCh <-signalCtx.Done()
// resource cleanup // resource cleanup
// remove webhook configurations // remove webhook configurations

View file

@ -38,7 +38,7 @@ const (
type Controller interface { type Controller interface {
// Run starts workers // Run starts workers
Run(int, <-chan struct{}) Run(context.Context, int)
} }
// controller manages the life-cycle for Generate-Requests and applies generate rule // controller manages the life-cycle for Generate-Requests and applies generate rule
@ -107,27 +107,27 @@ func NewController(
return &c return &c
} }
func (c *controller) Run(workers int, stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
logger.Info("starting") logger.Info("starting")
defer logger.Info("shutting down") defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("background", stopCh, c.informersSynced...) { if !cache.WaitForNamedCacheSync("background", ctx.Done(), c.informersSynced...) {
return return
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh) go wait.UntilWithContext(ctx, c.worker, time.Second)
} }
<-stopCh <-ctx.Done()
} }
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *controller) worker() { func (c *controller) worker(ctx context.Context) {
for c.processNextWorkItem() { for c.processNextWorkItem() {
} }
} }

View file

@ -1,6 +1,7 @@
package event package event
import ( import (
"context"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
@ -115,7 +116,7 @@ func (gen *Generator) Add(infos ...Info) {
} }
// Run begins generator // Run begins generator
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) { func (gen *Generator) Run(ctx context.Context, workers int) {
logger := gen.log logger := gen.log
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -123,12 +124,12 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
defer logger.Info("shutting down") defer logger.Info("shutting down")
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh) go wait.UntilWithContext(ctx, gen.runWorker, time.Second)
} }
<-stopCh <-ctx.Done()
} }
func (gen *Generator) runWorker() { func (gen *Generator) runWorker(ctx context.Context) {
for gen.processNextWorkItem() { for gen.processNextWorkItem() {
} }
} }

View file

@ -346,7 +346,7 @@ func (pc *PolicyController) enqueuePolicy(policy kyvernov1.PolicyInterface) {
} }
// Run begins watching and syncing. // Run begins watching and syncing.
func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cleanupChangeRequest <-chan policyreport.ReconcileInfo*/, stopCh <-chan struct{}) { func (pc *PolicyController) Run(ctx context.Context, workers int) {
logger := pc.log logger := pc.log
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -355,7 +355,7 @@ func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cl
logger.Info("starting") logger.Info("starting")
defer logger.Info("shutting down") defer logger.Info("shutting down")
if !cache.WaitForNamedCacheSync("PolicyController", stopCh, pc.informersSynced...) { if !cache.WaitForNamedCacheSync("PolicyController", ctx.Done(), pc.informersSynced...) {
return return
} }
@ -372,17 +372,17 @@ func (pc *PolicyController) Run(workers int /*, reconcileCh <-chan bool*/ /*, cl
}) })
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(pc.worker, time.Second, stopCh) go wait.UntilWithContext(ctx, pc.worker, time.Second)
} }
go pc.forceReconciliation( /*reconcileCh, */ /* cleanupChangeRequest,*/ stopCh) go pc.forceReconciliation(ctx)
<-stopCh <-ctx.Done()
} }
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (pc *PolicyController) worker() { func (pc *PolicyController) worker(ctx context.Context) {
for pc.processNextWorkItem() { for pc.processNextWorkItem() {
} }
} }

View file

@ -1,6 +1,7 @@
package policy package policy
import ( import (
"context"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
@ -21,7 +22,7 @@ func (pc *PolicyController) report(engineResponses []*response.EngineResponse, l
} }
// forceReconciliation forces a background scan by adding all policies to the workqueue // forceReconciliation forces a background scan by adding all policies to the workqueue
func (pc *PolicyController) forceReconciliation(stopCh <-chan struct{}) { func (pc *PolicyController) forceReconciliation(ctx context.Context) {
logger := pc.log.WithName("forceReconciliation") logger := pc.log.WithName("forceReconciliation")
ticker := time.NewTicker(pc.reconcilePeriod) ticker := time.NewTicker(pc.reconcilePeriod)
@ -31,7 +32,7 @@ func (pc *PolicyController) forceReconciliation(stopCh <-chan struct{}) {
logger.Info("performing the background scan", "scan interval", pc.reconcilePeriod.String()) logger.Info("performing the background scan", "scan interval", pc.reconcilePeriod.String())
pc.requeuePolicies() pc.requeuePolicies()
case <-stopCh: case <-ctx.Done():
return return
} }
} }

View file

@ -80,6 +80,7 @@ type manage interface {
} }
func newWebhookConfigManager( func newWebhookConfigManager(
ctx context.Context,
discoveryClient dclient.IDiscovery, discoveryClient dclient.IDiscovery,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
kyvernoClient versioned.Interface, kyvernoClient versioned.Interface,
@ -91,7 +92,6 @@ func newWebhookConfigManager(
serverIP string, serverIP string,
autoUpdateWebhooks bool, autoUpdateWebhooks bool,
createDefaultWebhook chan<- string, createDefaultWebhook chan<- string,
stopCh <-chan struct{},
log logr.Logger, log logr.Logger,
) manage { ) manage {
m := &webhookConfigManager{ m := &webhookConfigManager{
@ -112,7 +112,7 @@ func newWebhookConfigManager(
serverIP: serverIP, serverIP: serverIP,
autoUpdateWebhooks: autoUpdateWebhooks, autoUpdateWebhooks: autoUpdateWebhooks,
createDefaultWebhook: createDefaultWebhook, createDefaultWebhook: createDefaultWebhook,
stopCh: stopCh, stopCh: ctx.Done(),
log: log, log: log,
} }

View file

@ -78,7 +78,7 @@ func (t *Monitor) SetTime(tm time.Time) {
} }
// Run runs the checker and verify the resource update // Run runs the checker and verify the resource update
func (t *Monitor) Run(register *Register, certRenewer *tls.CertRenewer, eventGen event.Interface, stopCh <-chan struct{}) { func (t *Monitor) Run(ctx context.Context, register *Register, certRenewer *tls.CertRenewer, eventGen event.Interface) {
logger := t.log.WithName("webhookMonitor") logger := t.log.WithName("webhookMonitor")
logger.V(3).Info("starting webhook monitor", "interval", idleCheckInterval.String()) logger.V(3).Info("starting webhook monitor", "interval", idleCheckInterval.String())
@ -178,7 +178,7 @@ func (t *Monitor) Run(register *Register, certRenewer *tls.CertRenewer, eventGen
logger.Error(err, "failed to annotate deployment webhook status to success") logger.Error(err, "failed to annotate deployment webhook status to success")
} }
case <-stopCh: case <-ctx.Done():
// handler termination signal // handler termination signal
logger.V(2).Info("stopping webhook monitor") logger.V(2).Info("stopping webhook monitor")
return return

View file

@ -71,6 +71,7 @@ type Register struct {
// NewRegister creates new Register instance // NewRegister creates new Register instance
func NewRegister( func NewRegister(
ctx context.Context,
clientConfig *rest.Config, clientConfig *rest.Config,
client dclient.Interface, client dclient.Interface,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
@ -85,7 +86,6 @@ func NewRegister(
webhookTimeout int32, webhookTimeout int32,
debug bool, debug bool,
autoUpdateWebhooks bool, autoUpdateWebhooks bool,
stopCh <-chan struct{},
log logr.Logger, log logr.Logger,
) *Register { ) *Register {
register := &Register{ register := &Register{
@ -98,7 +98,7 @@ func NewRegister(
metricsConfig: metricsConfig, metricsConfig: metricsConfig,
UpdateWebhookChan: make(chan bool), UpdateWebhookChan: make(chan bool),
createDefaultWebhook: make(chan string), createDefaultWebhook: make(chan string),
stopCh: stopCh, stopCh: ctx.Done(),
serverIP: serverIP, serverIP: serverIP,
timeoutSeconds: webhookTimeout, timeoutSeconds: webhookTimeout,
log: log.WithName("Register"), log: log.WithName("Register"),
@ -106,7 +106,21 @@ func NewRegister(
autoUpdateWebhooks: autoUpdateWebhooks, autoUpdateWebhooks: autoUpdateWebhooks,
} }
register.manage = newWebhookConfigManager(client.Discovery(), kubeClient, kyvernoClient, pInformer, npInformer, mwcInformer, vwcInformer, metricsConfig, serverIP, register.autoUpdateWebhooks, register.createDefaultWebhook, stopCh, log.WithName("WebhookConfigManager")) register.manage = newWebhookConfigManager(
ctx,
client.Discovery(),
kubeClient,
kyvernoClient,
pInformer,
npInformer,
mwcInformer,
vwcInformer,
metricsConfig,
serverIP,
register.autoUpdateWebhooks,
register.createDefaultWebhook,
log.WithName("WebhookConfigManager"),
)
return register return register
} }