mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-30 19:35:06 +00:00
refactor: leader controllers management (#4832)
* refactor: leader controllers management Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * rename Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * fix start Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * fix deps Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * remove dead code Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
parent
106880c8d0
commit
7849fbbc8a
8 changed files with 285 additions and 278 deletions
|
@ -3,8 +3,6 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: eventually move this in an util package
|
// TODO: eventually move this in an util package
|
||||||
|
@ -45,11 +43,3 @@ func startInformersAndWaitForCacheSync(ctx context.Context, informers ...informe
|
||||||
startInformers(ctx, informers...)
|
startInformers(ctx, informers...)
|
||||||
return waitForCacheSync(ctx, informers...)
|
return waitForCacheSync(ctx, informers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForInformersCacheSync(ctx context.Context, informers ...cache.SharedInformer) bool {
|
|
||||||
var hasSynced []cache.InformerSynced
|
|
||||||
for i := range informers {
|
|
||||||
hasSynced = append(hasSynced, informers[i].HasSynced)
|
|
||||||
}
|
|
||||||
return cache.WaitForCacheSync(ctx.Done(), hasSynced...)
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package main
|
||||||
// We currently accept the risk of exposing pprof and rely on users to protect the endpoint.
|
// We currently accept the risk of exposing pprof and rely on users to protect the endpoint.
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -338,6 +339,137 @@ func createNonLeaderControllers(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createReportControllers(
|
||||||
|
backgroundScan bool,
|
||||||
|
admissionReports bool,
|
||||||
|
client dclient.Interface,
|
||||||
|
kyvernoClient versioned.Interface,
|
||||||
|
metadataFactory metadatainformers.SharedInformerFactory,
|
||||||
|
kubeInformer kubeinformers.SharedInformerFactory,
|
||||||
|
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
||||||
|
) []controller {
|
||||||
|
var ctrls []controller
|
||||||
|
kyvernoV1 := kyvernoInformer.Kyverno().V1()
|
||||||
|
if backgroundScan || admissionReports {
|
||||||
|
resourceReportController := resourcereportcontroller.NewController(
|
||||||
|
client,
|
||||||
|
kyvernoV1.Policies(),
|
||||||
|
kyvernoV1.ClusterPolicies(),
|
||||||
|
)
|
||||||
|
ctrls = append(ctrls, newController(
|
||||||
|
resourcereportcontroller.ControllerName,
|
||||||
|
resourceReportController,
|
||||||
|
resourcereportcontroller.Workers,
|
||||||
|
))
|
||||||
|
ctrls = append(ctrls, newController(
|
||||||
|
aggregatereportcontroller.ControllerName,
|
||||||
|
aggregatereportcontroller.NewController(
|
||||||
|
kyvernoClient,
|
||||||
|
metadataFactory,
|
||||||
|
resourceReportController,
|
||||||
|
reportsChunkSize,
|
||||||
|
),
|
||||||
|
aggregatereportcontroller.Workers,
|
||||||
|
))
|
||||||
|
if admissionReports {
|
||||||
|
ctrls = append(ctrls, newController(
|
||||||
|
admissionreportcontroller.ControllerName,
|
||||||
|
admissionreportcontroller.NewController(
|
||||||
|
kyvernoClient,
|
||||||
|
metadataFactory,
|
||||||
|
resourceReportController,
|
||||||
|
),
|
||||||
|
admissionreportcontroller.Workers,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
if backgroundScan {
|
||||||
|
ctrls = append(ctrls, newController(
|
||||||
|
backgroundscancontroller.ControllerName,
|
||||||
|
backgroundscancontroller.NewController(
|
||||||
|
client,
|
||||||
|
kyvernoClient,
|
||||||
|
metadataFactory,
|
||||||
|
kyvernoV1.Policies(),
|
||||||
|
kyvernoV1.ClusterPolicies(),
|
||||||
|
kubeInformer.Core().V1().Namespaces(),
|
||||||
|
resourceReportController,
|
||||||
|
),
|
||||||
|
backgroundscancontroller.Workers,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ctrls
|
||||||
|
}
|
||||||
|
|
||||||
|
func createrLeaderControllers(
|
||||||
|
kubeInformer kubeinformers.SharedInformerFactory,
|
||||||
|
kubeKyvernoInformer kubeinformers.SharedInformerFactory,
|
||||||
|
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
||||||
|
metadataInformer metadatainformers.SharedInformerFactory,
|
||||||
|
kubeClient kubernetes.Interface,
|
||||||
|
kyvernoClient versioned.Interface,
|
||||||
|
dynamicClient dclient.Interface,
|
||||||
|
configuration config.Configuration,
|
||||||
|
metricsConfig *metrics.MetricsConfig,
|
||||||
|
eventGenerator event.Interface,
|
||||||
|
certRenewer *tls.CertRenewer,
|
||||||
|
) ([]controller, error) {
|
||||||
|
policyCtrl, err := policy.NewPolicyController(
|
||||||
|
kyvernoClient,
|
||||||
|
dynamicClient,
|
||||||
|
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||||
|
kyvernoInformer.Kyverno().V1().Policies(),
|
||||||
|
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
||||||
|
configuration,
|
||||||
|
eventGenerator,
|
||||||
|
kubeInformer.Core().V1().Namespaces(),
|
||||||
|
logging.WithName("PolicyController"),
|
||||||
|
time.Hour,
|
||||||
|
metricsConfig,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
certManager := certmanager.NewController(
|
||||||
|
kubeKyvernoInformer.Core().V1().Secrets(),
|
||||||
|
certRenewer,
|
||||||
|
)
|
||||||
|
webhookController := webhookcontroller.NewController(
|
||||||
|
metrics.ObjectClient[*corev1.Secret](
|
||||||
|
metrics.NamespacedClientQueryRecorder(metricsConfig, config.KyvernoNamespace(), "Secret", metrics.KubeClient),
|
||||||
|
kubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
|
||||||
|
),
|
||||||
|
metrics.ObjectClient[*admissionregistrationv1.MutatingWebhookConfiguration](
|
||||||
|
metrics.ClusteredClientQueryRecorder(metricsConfig, "MutatingWebhookConfiguration", metrics.KubeClient),
|
||||||
|
kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations(),
|
||||||
|
),
|
||||||
|
metrics.ObjectClient[*admissionregistrationv1.ValidatingWebhookConfiguration](
|
||||||
|
metrics.ClusteredClientQueryRecorder(metricsConfig, "ValidatingWebhookConfiguration", metrics.KubeClient),
|
||||||
|
kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
|
||||||
|
),
|
||||||
|
kubeKyvernoInformer.Core().V1().Secrets(),
|
||||||
|
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations(),
|
||||||
|
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
|
||||||
|
)
|
||||||
|
return append(
|
||||||
|
[]controller{
|
||||||
|
newController("policy-controller", policyCtrl, 2),
|
||||||
|
newController(certmanager.ControllerName, certManager, certmanager.Workers),
|
||||||
|
newController(webhookcontroller.ControllerName, webhookController, webhookcontroller.Workers),
|
||||||
|
},
|
||||||
|
createReportControllers(
|
||||||
|
backgroundScan,
|
||||||
|
admissionReports,
|
||||||
|
dynamicClient,
|
||||||
|
kyvernoClient,
|
||||||
|
metadataInformer,
|
||||||
|
kubeInformer,
|
||||||
|
kyvernoInformer,
|
||||||
|
)...,
|
||||||
|
),
|
||||||
|
nil
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// parse flags
|
// parse flags
|
||||||
if err := parseFlags(); err != nil {
|
if err := parseFlags(); err != nil {
|
||||||
|
@ -403,8 +535,6 @@ func main() {
|
||||||
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
|
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
|
||||||
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
|
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
|
||||||
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
|
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
|
||||||
metadataInformer := metadatainformers.NewSharedInformerFactory(metadataClient, 15*time.Minute)
|
|
||||||
|
|
||||||
webhookCfg := webhookconfig.NewRegister(
|
webhookCfg := webhookconfig.NewRegister(
|
||||||
signalCtx,
|
signalCtx,
|
||||||
clientConfig,
|
clientConfig,
|
||||||
|
@ -435,45 +565,6 @@ func main() {
|
||||||
logger.Error(err, "Failed to create openapi manager")
|
logger.Error(err, "Failed to create openapi manager")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
policyCache := policycache.NewCache()
|
|
||||||
eventGenerator := event.NewEventGenerator(
|
|
||||||
dynamicClient,
|
|
||||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
|
||||||
kyvernoInformer.Kyverno().V1().Policies(),
|
|
||||||
maxQueuedEvents,
|
|
||||||
logging.WithName("EventGenerator"),
|
|
||||||
)
|
|
||||||
|
|
||||||
webhookMonitor, err := webhookconfig.NewMonitor(kubeClient, logging.GlobalLogger())
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(err, "failed to initialize webhookMonitor")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// POLICY CONTROLLER
|
|
||||||
// - reconciliation policy and policy violation
|
|
||||||
// - process policy on existing resources
|
|
||||||
// - status aggregator: receives stats when a policy is applied & updates the policy status
|
|
||||||
policyCtrl, err := policy.NewPolicyController(
|
|
||||||
kyvernoClient,
|
|
||||||
dynamicClient,
|
|
||||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
|
||||||
kyvernoInformer.Kyverno().V1().Policies(),
|
|
||||||
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
|
||||||
configuration,
|
|
||||||
eventGenerator,
|
|
||||||
kubeInformer.Core().V1().Namespaces(),
|
|
||||||
logging.WithName("PolicyController"),
|
|
||||||
time.Hour,
|
|
||||||
metricsConfig,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(err, "Failed to create policy controller")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
urgen := webhookgenerate.NewGenerator(kyvernoClient, kyvernoInformer.Kyverno().V1beta1().UpdateRequests())
|
|
||||||
|
|
||||||
certRenewer, err := tls.NewCertRenewer(
|
certRenewer, err := tls.NewCertRenewer(
|
||||||
metrics.ObjectClient[*corev1.Secret](
|
metrics.ObjectClient[*corev1.Secret](
|
||||||
metrics.NamespacedClientQueryRecorder(metricsConfig, config.KyvernoNamespace(), "Secret", metrics.KubeClient),
|
metrics.NamespacedClientQueryRecorder(metricsConfig, config.KyvernoNamespace(), "Secret", metrics.KubeClient),
|
||||||
|
@ -490,148 +581,14 @@ func main() {
|
||||||
logger.Error(err, "failed to initialize CertRenewer")
|
logger.Error(err, "failed to initialize CertRenewer")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
certManager := certmanager.NewController(kubeKyvernoInformer.Core().V1().Secrets(), certRenewer)
|
policyCache := policycache.NewCache()
|
||||||
|
eventGenerator := event.NewEventGenerator(
|
||||||
webhookController := webhookcontroller.NewController(
|
|
||||||
metrics.ObjectClient[*corev1.Secret](
|
|
||||||
metrics.NamespacedClientQueryRecorder(metricsConfig, config.KyvernoNamespace(), "Secret", metrics.KubeClient),
|
|
||||||
kubeClient.CoreV1().Secrets(config.KyvernoNamespace()),
|
|
||||||
),
|
|
||||||
metrics.ObjectClient[*admissionregistrationv1.MutatingWebhookConfiguration](
|
|
||||||
metrics.ClusteredClientQueryRecorder(metricsConfig, "MutatingWebhookConfiguration", metrics.KubeClient),
|
|
||||||
kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations(),
|
|
||||||
),
|
|
||||||
metrics.ObjectClient[*admissionregistrationv1.ValidatingWebhookConfiguration](
|
|
||||||
metrics.ClusteredClientQueryRecorder(metricsConfig, "ValidatingWebhookConfiguration", metrics.KubeClient),
|
|
||||||
kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations(),
|
|
||||||
),
|
|
||||||
kubeKyvernoInformer.Core().V1().Secrets(),
|
|
||||||
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations(),
|
|
||||||
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
|
|
||||||
)
|
|
||||||
|
|
||||||
// WEBHOOK
|
|
||||||
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
|
|
||||||
// - reports the results based on the response from the policy engine:
|
|
||||||
// -- annotations on resources with update details on mutation JSON patches
|
|
||||||
// -- generate policy violation resource
|
|
||||||
// -- generate events on policy and resource
|
|
||||||
policyHandlers := webhookspolicy.NewHandlers(dynamicClient, openApiManager)
|
|
||||||
resourceHandlers := webhooksresource.NewHandlers(
|
|
||||||
dynamicClient,
|
dynamicClient,
|
||||||
kyvernoClient,
|
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||||
configuration,
|
kyvernoInformer.Kyverno().V1().Policies(),
|
||||||
metricsConfig,
|
maxQueuedEvents,
|
||||||
policyCache,
|
logging.WithName("EventGenerator"),
|
||||||
kubeInformer.Core().V1().Namespaces().Lister(),
|
|
||||||
kubeInformer.Rbac().V1().RoleBindings().Lister(),
|
|
||||||
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
|
|
||||||
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
|
|
||||||
urgen,
|
|
||||||
eventGenerator,
|
|
||||||
openApiManager,
|
|
||||||
admissionReports,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
|
|
||||||
server := webhooks.NewServer(
|
|
||||||
policyHandlers,
|
|
||||||
resourceHandlers,
|
|
||||||
func() ([]byte, []byte, error) {
|
|
||||||
secret, err := secretLister.Secrets(config.KyvernoNamespace()).Get(tls.GenerateTLSPairSecretName())
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
|
|
||||||
},
|
|
||||||
configuration,
|
|
||||||
webhookCfg,
|
|
||||||
webhookMonitor,
|
|
||||||
)
|
|
||||||
|
|
||||||
// wrap all controllers that need leaderelection
|
|
||||||
// start them once by the leader
|
|
||||||
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", logger)
|
|
||||||
run := func(context.Context) {
|
|
||||||
logger := logger.WithName("leader")
|
|
||||||
if err := certRenewer.InitTLSPemPair(); err != nil {
|
|
||||||
logger.Error(err, "tls initialization error")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
// wait for cache to be synced before use it
|
|
||||||
if !waitForInformersCacheSync(signalCtx,
|
|
||||||
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(),
|
|
||||||
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(),
|
|
||||||
) {
|
|
||||||
// TODO: shall we just exit ?
|
|
||||||
logger.Info("failed to wait for cache sync")
|
|
||||||
}
|
|
||||||
|
|
||||||
// validate the ConfigMap format
|
|
||||||
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil {
|
|
||||||
logger.Error(err, "invalid format of the Kyverno init ConfigMap, please correct the format of 'data.webhooks'")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if autoUpdateWebhooks {
|
|
||||||
go webhookCfg.UpdateWebhookConfigurations(configuration)
|
|
||||||
}
|
|
||||||
if registrationErr := registerWrapperRetry(); registrationErr != nil {
|
|
||||||
logger.Error(err, "Timeout registering admission control webhooks")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
webhookCfg.UpdateWebhookChan <- true
|
|
||||||
go certManager.Run(signalCtx, certmanager.Workers)
|
|
||||||
go policyCtrl.Run(signalCtx, 2)
|
|
||||||
go webhookController.Run(signalCtx, webhookcontroller.Workers)
|
|
||||||
|
|
||||||
reportControllers := setupReportControllers(
|
|
||||||
backgroundScan,
|
|
||||||
admissionReports,
|
|
||||||
dynamicClient,
|
|
||||||
kyvernoClient,
|
|
||||||
metadataInformer,
|
|
||||||
kubeInformer,
|
|
||||||
kyvernoInformer,
|
|
||||||
)
|
|
||||||
startInformers(signalCtx, metadataInformer)
|
|
||||||
if !checkCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) {
|
|
||||||
// TODO: shall we just exit ?
|
|
||||||
logger.Info("failed to wait for cache sync")
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range reportControllers {
|
|
||||||
go reportControllers[i].run(signalCtx, logger.WithName("controllers"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanup Kyverno managed resources followed by webhook shutdown
|
|
||||||
// No need to exit here, as server.Stop(ctx) closes the cleanUp
|
|
||||||
// chan, thus the main process exits.
|
|
||||||
stop := func() {
|
|
||||||
c, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
server.Stop(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
le, err := leaderelection.New(
|
|
||||||
logger.WithName("leader-election"),
|
|
||||||
"kyverno",
|
|
||||||
config.KyvernoNamespace(),
|
|
||||||
kubeClientLeaderElection,
|
|
||||||
config.KyvernoPodName(),
|
|
||||||
run,
|
|
||||||
stop,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(err, "failed to elect a leader")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// cancel leader election context on shutdown signals
|
|
||||||
go func() {
|
|
||||||
defer signalCancel()
|
|
||||||
<-signalCtx.Done()
|
|
||||||
}()
|
|
||||||
// create non leader controllers
|
// create non leader controllers
|
||||||
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
|
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
|
||||||
kubeInformer,
|
kubeInformer,
|
||||||
|
@ -647,7 +604,7 @@ func main() {
|
||||||
)
|
)
|
||||||
// start informers and wait for cache sync
|
// start informers and wait for cache sync
|
||||||
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
|
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
|
||||||
logger.Error(err, "failed to wait for cache sync")
|
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
// bootstrap non leader controllers
|
// bootstrap non leader controllers
|
||||||
|
@ -659,86 +616,150 @@ func main() {
|
||||||
}
|
}
|
||||||
// start event generator
|
// start event generator
|
||||||
go eventGenerator.Run(signalCtx, 3)
|
go eventGenerator.Run(signalCtx, 3)
|
||||||
// start leader election
|
|
||||||
go le.Run(signalCtx)
|
|
||||||
// start non leader controllers
|
// start non leader controllers
|
||||||
for _, controller := range nonLeaderControllers {
|
for _, controller := range nonLeaderControllers {
|
||||||
go controller.run(signalCtx, logger.WithName("controllers"))
|
go controller.run(signalCtx, logger.WithName("controllers"))
|
||||||
}
|
}
|
||||||
|
// setup leader election
|
||||||
|
le, err := leaderelection.New(
|
||||||
|
logger.WithName("leader-election"),
|
||||||
|
"kyverno",
|
||||||
|
config.KyvernoNamespace(),
|
||||||
|
kubeClientLeaderElection,
|
||||||
|
config.KyvernoPodName(),
|
||||||
|
func(ctx context.Context) {
|
||||||
|
logger := logger.WithName("leader")
|
||||||
|
// when losing the lead we just terminate the pod
|
||||||
|
defer signalCancel()
|
||||||
|
// init tls secret
|
||||||
|
if err := certRenewer.InitTLSPemPair(); err != nil {
|
||||||
|
logger.Error(err, "tls initialization error")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// validate config
|
||||||
|
if err := webhookCfg.ValidateWebhookConfigurations(config.KyvernoNamespace(), config.KyvernoConfigMapName()); err != nil {
|
||||||
|
logger.Error(err, "invalid format of the Kyverno init ConfigMap, please correct the format of 'data.webhooks'")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// create leader factories
|
||||||
|
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
|
||||||
|
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
|
||||||
|
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
|
||||||
|
metadataInformer := metadatainformers.NewSharedInformerFactory(metadataClient, 15*time.Minute)
|
||||||
|
// create leader controllers
|
||||||
|
leaderControllers, err := createrLeaderControllers(
|
||||||
|
kubeInformer,
|
||||||
|
kubeKyvernoInformer,
|
||||||
|
kyvernoInformer,
|
||||||
|
metadataInformer,
|
||||||
|
kubeClient,
|
||||||
|
kyvernoClient,
|
||||||
|
dynamicClient,
|
||||||
|
configuration,
|
||||||
|
metricsConfig,
|
||||||
|
eventGenerator,
|
||||||
|
certRenewer,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err, "failed to create leader controllers")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// start informers and wait for cache sync
|
||||||
|
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
|
||||||
|
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
startInformers(signalCtx, metadataInformer)
|
||||||
|
if !checkCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) {
|
||||||
|
// TODO: shall we just exit ?
|
||||||
|
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
||||||
|
}
|
||||||
|
// bootstrap
|
||||||
|
if autoUpdateWebhooks {
|
||||||
|
go webhookCfg.UpdateWebhookConfigurations(configuration)
|
||||||
|
}
|
||||||
|
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", logger)
|
||||||
|
if err := registerWrapperRetry(); err != nil {
|
||||||
|
logger.Error(err, "timeout registering admission control webhooks")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
webhookCfg.UpdateWebhookChan <- true
|
||||||
|
// start leader controllers
|
||||||
|
for _, controller := range leaderControllers {
|
||||||
|
go controller.run(signalCtx, logger.WithName("controllers"))
|
||||||
|
}
|
||||||
|
// wait until we loose the lead (or signal context is canceled)
|
||||||
|
<-ctx.Done()
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err, "failed to initialize leader election")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// start leader election
|
||||||
|
go le.Run(signalCtx)
|
||||||
|
// create monitor
|
||||||
|
webhookMonitor, err := webhookconfig.NewMonitor(kubeClient, logging.GlobalLogger())
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err, "failed to initialize webhookMonitor")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
// start monitor (only when running in cluster)
|
// start monitor (only when running in cluster)
|
||||||
if serverIP == "" {
|
if serverIP == "" {
|
||||||
go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator)
|
go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator)
|
||||||
}
|
}
|
||||||
|
// create webhooks server
|
||||||
// verifies if the admission control is enabled and active
|
urgen := webhookgenerate.NewGenerator(
|
||||||
|
kyvernoClient,
|
||||||
|
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
||||||
|
)
|
||||||
|
policyHandlers := webhookspolicy.NewHandlers(
|
||||||
|
dynamicClient,
|
||||||
|
openApiManager,
|
||||||
|
)
|
||||||
|
resourceHandlers := webhooksresource.NewHandlers(
|
||||||
|
dynamicClient,
|
||||||
|
kyvernoClient,
|
||||||
|
configuration,
|
||||||
|
metricsConfig,
|
||||||
|
policyCache,
|
||||||
|
kubeInformer.Core().V1().Namespaces().Lister(),
|
||||||
|
kubeInformer.Rbac().V1().RoleBindings().Lister(),
|
||||||
|
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
|
||||||
|
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
|
||||||
|
urgen,
|
||||||
|
eventGenerator,
|
||||||
|
openApiManager,
|
||||||
|
admissionReports,
|
||||||
|
)
|
||||||
|
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
|
||||||
|
server := webhooks.NewServer(
|
||||||
|
policyHandlers,
|
||||||
|
resourceHandlers,
|
||||||
|
func() ([]byte, []byte, error) {
|
||||||
|
secret, err := secretLister.Secrets(config.KyvernoNamespace()).Get(tls.GenerateTLSPairSecretName())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey], nil
|
||||||
|
},
|
||||||
|
configuration,
|
||||||
|
webhookCfg,
|
||||||
|
webhookMonitor,
|
||||||
|
)
|
||||||
|
// start informers and wait for cache sync
|
||||||
|
// we need to call start again because we potentially registered new informers
|
||||||
|
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
|
||||||
|
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
// start webhooks server
|
||||||
server.Run(signalCtx.Done())
|
server.Run(signalCtx.Done())
|
||||||
|
// wait for termination signal
|
||||||
<-signalCtx.Done()
|
<-signalCtx.Done()
|
||||||
|
// wait for server cleanup
|
||||||
// resource cleanup
|
|
||||||
// remove webhook configurations
|
|
||||||
<-server.Cleanup()
|
<-server.Cleanup()
|
||||||
|
// say goodbye...
|
||||||
logger.V(2).Info("Kyverno shutdown successful")
|
logger.V(2).Info("Kyverno shutdown successful")
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupReportControllers(
|
|
||||||
backgroundScan bool,
|
|
||||||
admissionReports bool,
|
|
||||||
client dclient.Interface,
|
|
||||||
kyvernoClient versioned.Interface,
|
|
||||||
metadataFactory metadatainformers.SharedInformerFactory,
|
|
||||||
kubeInformer kubeinformers.SharedInformerFactory,
|
|
||||||
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
|
||||||
) []controller {
|
|
||||||
var ctrls []controller
|
|
||||||
kyvernoV1 := kyvernoInformer.Kyverno().V1()
|
|
||||||
if backgroundScan || admissionReports {
|
|
||||||
resourceReportController := resourcereportcontroller.NewController(
|
|
||||||
client,
|
|
||||||
kyvernoV1.Policies(),
|
|
||||||
kyvernoV1.ClusterPolicies(),
|
|
||||||
)
|
|
||||||
ctrls = append(ctrls, newController(
|
|
||||||
resourcereportcontroller.ControllerName,
|
|
||||||
resourceReportController,
|
|
||||||
resourcereportcontroller.Workers,
|
|
||||||
))
|
|
||||||
ctrls = append(ctrls, newController(
|
|
||||||
aggregatereportcontroller.ControllerName,
|
|
||||||
aggregatereportcontroller.NewController(
|
|
||||||
kyvernoClient,
|
|
||||||
metadataFactory,
|
|
||||||
resourceReportController,
|
|
||||||
reportsChunkSize,
|
|
||||||
),
|
|
||||||
aggregatereportcontroller.Workers,
|
|
||||||
))
|
|
||||||
if admissionReports {
|
|
||||||
ctrls = append(ctrls, newController(
|
|
||||||
admissionreportcontroller.ControllerName,
|
|
||||||
admissionreportcontroller.NewController(
|
|
||||||
kyvernoClient,
|
|
||||||
metadataFactory,
|
|
||||||
resourceReportController,
|
|
||||||
),
|
|
||||||
admissionreportcontroller.Workers,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
if backgroundScan {
|
|
||||||
ctrls = append(ctrls, newController(
|
|
||||||
backgroundscancontroller.ControllerName,
|
|
||||||
backgroundscancontroller.NewController(
|
|
||||||
client,
|
|
||||||
kyvernoClient,
|
|
||||||
metadataFactory,
|
|
||||||
kyvernoV1.Policies(),
|
|
||||||
kyvernoV1.ClusterPolicies(),
|
|
||||||
kubeInformer.Core().V1().Namespaces(),
|
|
||||||
resourceReportController,
|
|
||||||
),
|
|
||||||
backgroundscancontroller.Workers,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ctrls
|
|
||||||
}
|
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -349,7 +349,6 @@ require (
|
||||||
golang.org/x/term v0.0.0-20220919170432-7a66f970e087 // indirect
|
golang.org/x/term v0.0.0-20220919170432-7a66f970e087 // indirect
|
||||||
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
|
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
|
||||||
golang.org/x/tools v0.1.12 // indirect
|
golang.org/x/tools v0.1.12 // indirect
|
||||||
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
|
|
||||||
google.golang.org/api v0.98.0 // indirect
|
google.golang.org/api v0.98.0 // indirect
|
||||||
google.golang.org/appengine v1.6.7 // indirect
|
google.golang.org/appengine v1.6.7 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 // indirect
|
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 // indirect
|
||||||
|
|
3
go.sum
3
go.sum
|
@ -632,7 +632,6 @@ github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E
|
||||||
github.com/esimonov/ifshort v1.0.2/go.mod h1:yZqNJUrNn20K8Q9n2CrjTKYyVEmX209Hgu+M1LBpeZE=
|
github.com/esimonov/ifshort v1.0.2/go.mod h1:yZqNJUrNn20K8Q9n2CrjTKYyVEmX209Hgu+M1LBpeZE=
|
||||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
||||||
github.com/ettle/strcase v0.1.1/go.mod h1:hzDLsPC7/lwKyBOywSHEP89nt2pDgdy+No1NBA9o9VY=
|
github.com/ettle/strcase v0.1.1/go.mod h1:hzDLsPC7/lwKyBOywSHEP89nt2pDgdy+No1NBA9o9VY=
|
||||||
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
|
|
||||||
github.com/evanphx/json-patch v4.0.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
github.com/evanphx/json-patch v4.0.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||||
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||||
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||||
|
@ -2722,7 +2721,6 @@ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNq
|
||||||
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
|
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
|
||||||
gomodules.xyz/jsonpatch/v2 v2.1.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
|
gomodules.xyz/jsonpatch/v2 v2.1.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
|
||||||
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
|
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
|
||||||
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
|
|
||||||
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
||||||
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
|
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
|
||||||
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
|
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
|
||||||
|
@ -3131,7 +3129,6 @@ k8s.io/klog v0.2.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||||
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||||
k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
|
k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
|
||||||
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
|
|
||||||
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
|
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
|
||||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||||
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
|
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
|
||||||
|
|
|
@ -16,8 +16,11 @@ import (
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Workers is the number of workers for this controller
|
const (
|
||||||
const Workers = 1
|
// Workers is the number of workers for this controller
|
||||||
|
Workers = 1
|
||||||
|
ControllerName = "certmanager-controller"
|
||||||
|
)
|
||||||
|
|
||||||
type controller struct {
|
type controller struct {
|
||||||
renewer *tls.CertRenewer
|
renewer *tls.CertRenewer
|
||||||
|
|
|
@ -2,6 +2,4 @@ package certmanager
|
||||||
|
|
||||||
import "github.com/kyverno/kyverno/pkg/logging"
|
import "github.com/kyverno/kyverno/pkg/logging"
|
||||||
|
|
||||||
const controllerName = "certmanager-controller"
|
var logger = logging.WithName(ControllerName)
|
||||||
|
|
||||||
var logger = logging.WithName(controllerName)
|
|
||||||
|
|
|
@ -23,8 +23,9 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Workers is the number of workers for this controller
|
// Workers is the number of workers for this controller
|
||||||
Workers = 2
|
Workers = 2
|
||||||
maxRetries = 10
|
ControllerName = "webhook-ca-controller"
|
||||||
|
maxRetries = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
type controller struct {
|
type controller struct {
|
||||||
|
@ -52,7 +53,7 @@ func NewController(
|
||||||
mwcInformer admissionregistrationv1informers.MutatingWebhookConfigurationInformer,
|
mwcInformer admissionregistrationv1informers.MutatingWebhookConfigurationInformer,
|
||||||
vwcInformer admissionregistrationv1informers.ValidatingWebhookConfigurationInformer,
|
vwcInformer admissionregistrationv1informers.ValidatingWebhookConfigurationInformer,
|
||||||
) controllers.Controller {
|
) controllers.Controller {
|
||||||
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName)
|
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
|
||||||
c := controller{
|
c := controller{
|
||||||
secretClient: secretClient,
|
secretClient: secretClient,
|
||||||
mwcClient: mwcClient,
|
mwcClient: mwcClient,
|
||||||
|
@ -86,7 +87,7 @@ func NewController(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) Run(ctx context.Context, workers int) {
|
func (c *controller) Run(ctx context.Context, workers int) {
|
||||||
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
|
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) enqueue(obj *corev1.Secret) error {
|
func (c *controller) enqueue(obj *corev1.Secret) error {
|
||||||
|
|
|
@ -2,6 +2,4 @@ package background
|
||||||
|
|
||||||
import "sigs.k8s.io/controller-runtime/pkg/log"
|
import "sigs.k8s.io/controller-runtime/pkg/log"
|
||||||
|
|
||||||
const controllerName = "webhook-ca-controller"
|
var logger = log.Log.WithName(ControllerName)
|
||||||
|
|
||||||
var logger = log.Log.WithName(controllerName)
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue