1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

refactor: non leader controllers management (#4831)

This commit is contained in:
Charles-Edouard Brétéché 2022-10-06 12:38:35 +02:00 committed by GitHub
parent 74172f2079
commit 1509fa6251
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 174 additions and 128 deletions

View file

@ -3,14 +3,25 @@ package main
import (
"context"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/controllers"
)
type controller struct {
name string
controller controllers.Controller
workers int
}
func (c *controller) run(ctx context.Context) {
func newController(name string, c controllers.Controller, w int) controller {
return controller{
name: name,
controller: c,
workers: w,
}
}
func (c controller) run(ctx context.Context, logger logr.Logger) {
logger.Info("start controller...", "name", c.name)
c.controller.Run(ctx, c.workers)
}

View file

@ -291,6 +291,53 @@ func sanityChecks(dynamicClient dclient.Interface) error {
return nil
}
func createNonLeaderControllers(
kubeInformer kubeinformers.SharedInformerFactory,
kubeKyvernoInformer kubeinformers.SharedInformerFactory,
kyvernoInformer kyvernoinformer.SharedInformerFactory,
kubeClient kubernetes.Interface,
kyvernoClient versioned.Interface,
dynamicClient dclient.Interface,
configuration config.Configuration,
policyCache policycache.Cache,
eventGenerator event.Interface,
manager *openapi.Controller,
) ([]controller, func() error) {
policyCacheController := policycachecontroller.NewController(
policyCache,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
)
openApiController := openapi.NewCRDSync(
dynamicClient,
manager,
)
configurationController := configcontroller.NewController(
configuration,
kubeKyvernoInformer.Core().V1().ConfigMaps(),
)
updateRequestController := background.NewController(
kyvernoClient,
dynamicClient,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
kubeInformer.Core().V1().Namespaces(),
kubeKyvernoInformer.Core().V1().Pods(),
eventGenerator,
configuration,
)
return []controller{
newController(policycachecontroller.ControllerName, policyCacheController, policycachecontroller.Workers),
newController("openapi-controller", openApiController, 1),
newController(configcontroller.ControllerName, configurationController, configcontroller.Workers),
newController("update-request-controller", updateRequestController, genWorkers),
},
func() error {
return policyCacheController.WarmUp()
}
}
func main() {
// parse flags
if err := parseFlags(); err != nil {
@ -358,14 +405,6 @@ func main() {
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
metadataInformer := metadatainformers.NewSharedInformerFactory(metadataClient, 15*time.Minute)
// utils
kyvernoV1 := kyvernoInformer.Kyverno().V1()
kyvernoV1beta1 := kyvernoInformer.Kyverno().V1beta1()
// EVENT GENERATOR
// - generate event with retry mechanism
eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, logging.WithName("EventGenerator"))
webhookCfg := webhookconfig.NewRegister(
signalCtx,
clientConfig,
@ -375,14 +414,35 @@ func main() {
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
kubeKyvernoInformer.Apps().V1().Deployments(),
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
metricsConfig,
serverIP,
int32(webhookTimeout),
autoUpdateWebhooks,
logging.GlobalLogger(),
)
configuration, err := config.NewConfiguration(
kubeClient,
webhookCfg.UpdateWebhookChan,
)
if err != nil {
logger.Error(err, "failed to initialize configuration")
os.Exit(1)
}
openApiManager, err := openapi.NewOpenAPIController()
if err != nil {
logger.Error(err, "Failed to create openapi manager")
os.Exit(1)
}
policyCache := policycache.NewCache()
eventGenerator := event.NewEventGenerator(
dynamicClient,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
maxQueuedEvents,
logging.WithName("EventGenerator"),
)
webhookMonitor, err := webhookconfig.NewMonitor(kubeClient, logging.GlobalLogger())
if err != nil {
@ -390,13 +450,6 @@ func main() {
os.Exit(1)
}
configuration, err := config.NewConfiguration(kubeClient, webhookCfg.UpdateWebhookChan)
if err != nil {
logger.Error(err, "failed to initialize configuration")
os.Exit(1)
}
configurationController := configcontroller.NewController(configuration, kubeKyvernoInformer.Core().V1().ConfigMaps())
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
@ -404,9 +457,9 @@ func main() {
policyCtrl, err := policy.NewPolicyController(
kyvernoClient,
dynamicClient,
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoV1beta1.UpdateRequests(),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
configuration,
eventGenerator,
kubeInformer.Core().V1().Namespaces(),
@ -419,22 +472,7 @@ func main() {
os.Exit(1)
}
urgen := webhookgenerate.NewGenerator(kyvernoClient, kyvernoV1beta1.UpdateRequests())
urc := background.NewController(
kyvernoClient,
dynamicClient,
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoV1beta1.UpdateRequests(),
kubeInformer.Core().V1().Namespaces(),
kubeKyvernoInformer.Core().V1().Pods(),
eventGenerator,
configuration,
)
policyCache := policycache.NewCache()
policyCacheController := policycachecontroller.NewController(policyCache, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies())
urgen := webhookgenerate.NewGenerator(kyvernoClient, kyvernoInformer.Kyverno().V1beta1().UpdateRequests())
certRenewer, err := tls.NewCertRenewer(
metrics.ObjectClient[*corev1.Secret](
@ -472,16 +510,13 @@ func main() {
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
)
// the webhook server runs across all instances
openAPIController := startOpenAPIController(signalCtx, logger, dynamicClient)
// WEBHOOK
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
// - reports the results based on the response from the policy engine:
// -- annotations on resources with update details on mutation JSON patches
// -- generate policy violation resource
// -- generate events on policy and resource
policyHandlers := webhookspolicy.NewHandlers(dynamicClient, openAPIController)
policyHandlers := webhookspolicy.NewHandlers(dynamicClient, openApiManager)
resourceHandlers := webhooksresource.NewHandlers(
dynamicClient,
kyvernoClient,
@ -491,10 +526,10 @@ func main() {
kubeInformer.Core().V1().Namespaces().Lister(),
kubeInformer.Rbac().V1().RoleBindings().Lister(),
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
kyvernoV1beta1.UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
urgen,
eventGenerator,
openAPIController,
openApiManager,
admissionReports,
)
@ -518,6 +553,7 @@ func main() {
// start them once by the leader
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", logger)
run := func(context.Context) {
logger := logger.WithName("leader")
if err := certRenewer.InitTLSPemPair(); err != nil {
logger.Error(err, "tls initialization error")
os.Exit(1)
@ -564,7 +600,7 @@ func main() {
}
for i := range reportControllers {
go reportControllers[i].run(signalCtx)
go reportControllers[i].run(signalCtx, logger.WithName("controllers"))
}
}
@ -596,26 +632,40 @@ func main() {
defer signalCancel()
<-signalCtx.Done()
}()
// create non leader controllers
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
kubeInformer,
kubeKyvernoInformer,
kyvernoInformer,
kubeClient,
kyvernoClient,
dynamicClient,
configuration,
policyCache,
eventGenerator,
openApiManager,
)
// start informers and wait for cache sync
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(err, "Failed to wait for cache sync")
logger.Error(err, "failed to wait for cache sync")
os.Exit(1)
}
// warmup policy cache
if err := policyCacheController.WarmUp(); err != nil {
logger.Error(err, "Failed to warm up policy cache")
os.Exit(1)
// bootstrap non leader controllers
if nonLeaderBootstrap != nil {
if err := nonLeaderBootstrap(); err != nil {
logger.Error(err, "failed to bootstrap non leader controllers")
os.Exit(1)
}
}
// init events handlers
// start Kyverno controllers
go policyCacheController.Run(signalCtx, policycachecontroller.Workers)
go urc.Run(signalCtx, genWorkers)
go le.Run(signalCtx)
go configurationController.Run(signalCtx, configcontroller.Workers)
// start event generator
go eventGenerator.Run(signalCtx, 3)
// start leader election
go le.Run(signalCtx)
// start non leader controllers
for _, controller := range nonLeaderControllers {
go controller.run(signalCtx, logger.WithName("controllers"))
}
// start monitor (only when running in cluster)
if serverIP == "" {
go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator)
}
@ -631,21 +681,6 @@ func main() {
logger.V(2).Info("Kyverno shutdown successful")
}
func startOpenAPIController(ctx context.Context, logger logr.Logger, client dclient.Interface) *openapi.Controller {
logger = logger.WithName("open-api")
openAPIController, err := openapi.NewOpenAPIController()
if err != nil {
logger.Error(err, "Failed to create openAPIController")
os.Exit(1)
}
// Sync openAPI definitions of resources
openAPISync := openapi.NewCRDSync(client, openAPIController)
// start openAPI controller, this is used in admission review
// thus is required in each instance
openAPISync.Run(ctx, 1)
return openAPIController
}
func setupReportControllers(
backgroundScan bool,
admissionReports bool,
@ -663,8 +698,13 @@ func setupReportControllers(
kyvernoV1.Policies(),
kyvernoV1.ClusterPolicies(),
)
ctrls = append(ctrls, controller{resourceReportController, resourcereportcontroller.Workers})
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
resourcereportcontroller.ControllerName,
resourceReportController,
resourcereportcontroller.Workers,
))
ctrls = append(ctrls, newController(
aggregatereportcontroller.ControllerName,
aggregatereportcontroller.NewController(
kyvernoClient,
metadataFactory,
@ -672,19 +712,21 @@ func setupReportControllers(
reportsChunkSize,
),
aggregatereportcontroller.Workers,
})
))
if admissionReports {
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
admissionreportcontroller.ControllerName,
admissionreportcontroller.NewController(
kyvernoClient,
metadataFactory,
resourceReportController,
),
admissionreportcontroller.Workers,
})
))
}
if backgroundScan {
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
backgroundscancontroller.ControllerName,
backgroundscancontroller.NewController(
client,
kyvernoClient,
@ -695,7 +737,7 @@ func setupReportControllers(
resourceReportController,
),
backgroundscancontroller.Workers,
})
))
}
}
return ctrls

View file

@ -15,8 +15,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 3
maxRetries = 10
Workers = 3
ControllerName = "config-controller"
maxRetries = 10
)
type controller struct {
@ -33,14 +34,14 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
c := controller{
configuration: configuration,
configmapLister: configmapInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
}
controllerutils.AddDefaultEventHandlers(logger.V(3), configmapInformer.Informer(), c.queue)
return &c
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {

View file

@ -2,6 +2,4 @@ package config
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "config-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -19,8 +19,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 3
maxRetries = 10
Workers = 3
ControllerName = "policycache-controller"
maxRetries = 10
)
type Controller interface {
@ -44,7 +45,7 @@ func NewController(pcache pcache.Cache, cpolInformer kyvernov1informers.ClusterP
cache: pcache,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
}
controllerutils.AddDefaultEventHandlers(logger.V(3), cpolInformer.Informer(), c.queue)
controllerutils.AddDefaultEventHandlers(logger.V(3), polInformer.Informer(), c.queue)
@ -81,7 +82,7 @@ func (c *controller) WarmUp() error {
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {

View file

@ -2,6 +2,4 @@ package policycache
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "policycache-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -23,8 +23,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 2
maxRetries = 10
Workers = 2
ControllerName = "admission-report-controller"
maxRetries = 10
)
type controller struct {
@ -51,7 +52,7 @@ func NewController(
) controllers.Controller {
admrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("admissionreports"))
cadmrInformer := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("clusteradmissionreports"))
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName)
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
c := controller{
client: client,
admrLister: admrInformer.Lister(),
@ -74,7 +75,7 @@ func (c *controller) Run(ctx context.Context, workers int) {
logger.Error(err, "failed to enqueue")
}
})
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) enqueue(selector labels.Selector) error {

View file

@ -2,6 +2,4 @@ package admission
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "admission-report-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -28,8 +28,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 1
maxRetries = 10
Workers = 1
ControllerName = "aggregate-report-controller"
maxRetries = 10
)
type controller struct {
@ -71,7 +72,7 @@ func NewController(
cadmrLister: cadmrInformer.Lister(),
bgscanrLister: bgscanrInformer.Lister(),
cbgscanrLister: cbgscanrInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
metadataCache: metadataCache,
chunkSize: chunkSize,
}
@ -84,7 +85,7 @@ func NewController(
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) listAdmissionReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) {

View file

@ -2,6 +2,4 @@ package aggregate
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "aggregate-report-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -32,8 +32,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 2
maxRetries = 10
Workers = 2
ControllerName = "background-scan-controller"
maxRetries = 10
)
type controller struct {
@ -68,7 +69,7 @@ func NewController(
) controllers.Controller {
bgscanr := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("backgroundscanreports"))
cbgscanr := metadataFactory.ForResource(kyvernov1alpha2.SchemeGroupVersion.WithResource("clusterbackgroundscanreports"))
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName)
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
c := controller{
client: client,
kyvernoClient: kyvernoClient,
@ -102,7 +103,7 @@ func (c *controller) Run(ctx context.Context, workers int) {
c.queue.Add(resource.Namespace + "/" + string(uid))
}
})
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) addPolicy(obj interface{}) {

View file

@ -2,6 +2,4 @@ package background
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "background-scan-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -24,8 +24,9 @@ import (
const (
// Workers is the number of workers for this controller
Workers = 1
maxRetries = 5
Workers = 1
ControllerName = "resource-report-controller"
maxRetries = 5
)
type Resource struct {
@ -77,7 +78,7 @@ func NewController(
client: client,
polLister: polInformer.Lister(),
cpolLister: cpolInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicWatchers: map[schema.GroupVersionResource]*watcher{},
}
controllerutils.AddDefaultEventHandlers(logger.V(3), polInformer.Informer(), c.queue)
@ -86,7 +87,7 @@ func NewController(
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) {

View file

@ -2,6 +2,4 @@ package resource
import "github.com/kyverno/kyverno/pkg/logging"
const controllerName = "resource-report-controller"
var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

View file

@ -24,8 +24,10 @@ func Run(ctx context.Context, controllerName string, logger logr.Logger, queue w
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer queue.ShutDown()
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) {
return
if len(cacheSyncs) > 0 {
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) {
return
}
}
for i := 0; i < n; i++ {
wg.Add(1)

View file

@ -121,12 +121,9 @@ func isCRDInstalled(discoveryClient dclient.IDiscovery, kind string) bool {
if err == nil {
err = fmt.Errorf("not found")
}
logging.Error(err, "failed to retrieve CRD", "kind", kind)
return false
}
logging.V(2).Info("CRD found", "gvr", gvr.String())
return true
}