mirror of
https://github.com/kyverno/kyverno.git
synced 2024-12-14 11:57:48 +00:00
b54e6230c5
* refactor: make events controller shutdown graceful Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * nit Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * drain Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * refactor: events controller Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * exception Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> * remove queue Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> --------- Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> Co-authored-by: shuting <shuting@nirmata.com>
226 lines
7.1 KiB
Go
226 lines
7.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kyverno/kyverno/cmd/internal"
|
|
"github.com/kyverno/kyverno/pkg/background"
|
|
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
|
|
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions"
|
|
"github.com/kyverno/kyverno/pkg/clients/dclient"
|
|
"github.com/kyverno/kyverno/pkg/config"
|
|
policymetricscontroller "github.com/kyverno/kyverno/pkg/controllers/metrics/policy"
|
|
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
|
|
"github.com/kyverno/kyverno/pkg/engine/apicall"
|
|
"github.com/kyverno/kyverno/pkg/engine/jmespath"
|
|
"github.com/kyverno/kyverno/pkg/event"
|
|
"github.com/kyverno/kyverno/pkg/leaderelection"
|
|
"github.com/kyverno/kyverno/pkg/logging"
|
|
"github.com/kyverno/kyverno/pkg/metrics"
|
|
"github.com/kyverno/kyverno/pkg/policy"
|
|
kubeinformers "k8s.io/client-go/informers"
|
|
kyamlopenapi "sigs.k8s.io/kustomize/kyaml/openapi"
|
|
)
|
|
|
|
const (
|
|
resyncPeriod = 15 * time.Minute
|
|
)
|
|
|
|
func createrLeaderControllers(
|
|
eng engineapi.Engine,
|
|
genWorkers int,
|
|
kubeInformer kubeinformers.SharedInformerFactory,
|
|
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
|
kyvernoClient versioned.Interface,
|
|
dynamicClient dclient.Interface,
|
|
configuration config.Configuration,
|
|
metricsConfig metrics.MetricsConfigManager,
|
|
eventGenerator event.Interface,
|
|
jp jmespath.Interface,
|
|
backgroundScanInterval time.Duration,
|
|
) ([]internal.Controller, error) {
|
|
policyCtrl, err := policy.NewPolicyController(
|
|
kyvernoClient,
|
|
dynamicClient,
|
|
eng,
|
|
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
|
kyvernoInformer.Kyverno().V1().Policies(),
|
|
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
|
configuration,
|
|
eventGenerator,
|
|
kubeInformer.Core().V1().Namespaces(),
|
|
logging.WithName("PolicyController"),
|
|
backgroundScanInterval,
|
|
metricsConfig,
|
|
jp,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
backgroundController := background.NewController(
|
|
kyvernoClient,
|
|
dynamicClient,
|
|
eng,
|
|
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
|
kyvernoInformer.Kyverno().V1().Policies(),
|
|
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
|
kubeInformer.Core().V1().Namespaces(),
|
|
eventGenerator,
|
|
configuration,
|
|
jp,
|
|
)
|
|
return []internal.Controller{
|
|
internal.NewController("policy-controller", policyCtrl, 2),
|
|
internal.NewController("background-controller", backgroundController, genWorkers),
|
|
}, err
|
|
}
|
|
|
|
func main() {
|
|
var (
|
|
genWorkers int
|
|
maxQueuedEvents int
|
|
omitEvents string
|
|
maxAPICallResponseLength int64
|
|
)
|
|
flagset := flag.NewFlagSet("updaterequest-controller", flag.ExitOnError)
|
|
flagset.IntVar(&genWorkers, "genWorkers", 10, "Workers for the background controller.")
|
|
flagset.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
|
|
flagset.StringVar(&omitEvents, "omit-events", "", "Set this flag to a comma sperated list of PolicyViolation, PolicyApplied, PolicyError, PolicySkipped to disable events, e.g. --omit-events=PolicyApplied,PolicyViolation")
|
|
flagset.Int64Var(&maxAPICallResponseLength, "maxAPICallResponseLength", 2*1000*1000, "Maximum allowed response size from API Calls. A value of 0 bypasses checks (not recommended).")
|
|
|
|
// config
|
|
appConfig := internal.NewConfiguration(
|
|
internal.WithProfiling(),
|
|
internal.WithMetrics(),
|
|
internal.WithTracing(),
|
|
internal.WithKubeconfig(),
|
|
internal.WithPolicyExceptions(),
|
|
internal.WithConfigMapCaching(),
|
|
internal.WithDeferredLoading(),
|
|
internal.WithRegistryClient(),
|
|
internal.WithLeaderElection(),
|
|
internal.WithKyvernoClient(),
|
|
internal.WithDynamicClient(),
|
|
internal.WithKyvernoDynamicClient(),
|
|
internal.WithFlagSets(flagset),
|
|
)
|
|
// parse flags
|
|
internal.ParseFlags(appConfig)
|
|
// setup
|
|
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-background-controller", false)
|
|
defer sdown()
|
|
|
|
var err error
|
|
bgscanInterval := time.Hour
|
|
val := os.Getenv("BACKGROUND_SCAN_INTERVAL")
|
|
if val != "" {
|
|
if bgscanInterval, err = time.ParseDuration(val); err != nil {
|
|
setup.Logger.Error(err, "failed to set the background scan interval")
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
setup.Logger.V(2).Info("setting the background scan interval", "value", bgscanInterval.String())
|
|
|
|
// THIS IS AN UGLY FIX
|
|
// ELSE KYAML IS NOT THREAD SAFE
|
|
kyamlopenapi.Schema()
|
|
// informer factories
|
|
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
|
|
emitEventsValues := strings.Split(omitEvents, ",")
|
|
if omitEvents == "" {
|
|
emitEventsValues = []string{}
|
|
}
|
|
eventGenerator := event.NewEventGenerator(
|
|
setup.KyvernoDynamicClient,
|
|
logging.WithName("EventGenerator"),
|
|
emitEventsValues...,
|
|
)
|
|
// this controller only subscribe to events, nothing is returned...
|
|
var wg sync.WaitGroup
|
|
policymetricscontroller.NewController(
|
|
setup.MetricsManager,
|
|
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
|
kyvernoInformer.Kyverno().V1().Policies(),
|
|
&wg,
|
|
)
|
|
engine := internal.NewEngine(
|
|
signalCtx,
|
|
setup.Logger,
|
|
setup.Configuration,
|
|
setup.MetricsConfiguration,
|
|
setup.Jp,
|
|
setup.KyvernoDynamicClient,
|
|
setup.RegistryClient,
|
|
setup.ImageVerifyCacheClient,
|
|
setup.KubeClient,
|
|
setup.KyvernoClient,
|
|
setup.RegistrySecretLister,
|
|
apicall.NewAPICallConfiguration(maxAPICallResponseLength),
|
|
)
|
|
// start informers and wait for cache sync
|
|
if !internal.StartInformersAndWaitForCacheSync(signalCtx, setup.Logger, kyvernoInformer) {
|
|
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
|
os.Exit(1)
|
|
}
|
|
// start event generator
|
|
go eventGenerator.Run(signalCtx, event.Workers, &wg)
|
|
// setup leader election
|
|
le, err := leaderelection.New(
|
|
setup.Logger.WithName("leader-election"),
|
|
"kyverno-background-controller",
|
|
config.KyvernoNamespace(),
|
|
setup.LeaderElectionClient,
|
|
config.KyvernoPodName(),
|
|
internal.LeaderElectionRetryPeriod(),
|
|
func(ctx context.Context) {
|
|
logger := setup.Logger.WithName("leader")
|
|
// create leader factories
|
|
kubeInformer := kubeinformers.NewSharedInformerFactory(setup.KubeClient, resyncPeriod)
|
|
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
|
|
// create leader controllers
|
|
leaderControllers, err := createrLeaderControllers(
|
|
engine,
|
|
genWorkers,
|
|
kubeInformer,
|
|
kyvernoInformer,
|
|
setup.KyvernoClient,
|
|
setup.KyvernoDynamicClient,
|
|
setup.Configuration,
|
|
setup.MetricsManager,
|
|
eventGenerator,
|
|
setup.Jp,
|
|
bgscanInterval,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err, "failed to create leader controllers")
|
|
os.Exit(1)
|
|
}
|
|
// start informers and wait for cache sync
|
|
if !internal.StartInformersAndWaitForCacheSync(signalCtx, logger, kyvernoInformer, kubeInformer) {
|
|
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
|
os.Exit(1)
|
|
}
|
|
// start leader controllers
|
|
var wg sync.WaitGroup
|
|
for _, controller := range leaderControllers {
|
|
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
|
|
}
|
|
// wait all controllers shut down
|
|
wg.Wait()
|
|
},
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
setup.Logger.Error(err, "failed to initialize leader election")
|
|
os.Exit(1)
|
|
}
|
|
// start leader election
|
|
le.Run(signalCtx)
|
|
wg.Wait()
|
|
}
|