1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-15 17:50:58 +00:00
policy-reporter/cmd/run.go

238 lines
6.1 KiB
Go
Raw Normal View History

package cmd
import (
"context"
"errors"
"flag"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/kyverno/policy-reporter/pkg/api"
v1 "github.com/kyverno/policy-reporter/pkg/api/v1"
v2 "github.com/kyverno/policy-reporter/pkg/api/v2"
"github.com/kyverno/policy-reporter/pkg/config"
"github.com/kyverno/policy-reporter/pkg/database"
"github.com/kyverno/policy-reporter/pkg/listener"
)
func newRunCMD(version string) *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Run PolicyReporter Watcher & HTTP Metrics Server",
RunE: func(cmd *cobra.Command, args []string) error {
c, err := config.Load(cmd)
if err != nil {
return err
}
c.Version = version
var k8sConfig *rest.Config
if c.K8sClient.Kubeconfig != "" {
k8sConfig, err = clientcmd.BuildConfigFromFlags("", c.K8sClient.Kubeconfig)
} else {
k8sConfig, err = rest.InClusterConfig()
}
if err != nil {
return err
}
k8sConfig.QPS = c.K8sClient.QPS
k8sConfig.Burst = c.K8sClient.Burst
readinessProbe := config.NewReadinessProbe(c)
defer readinessProbe.Close()
resolver := config.NewResolver(c, k8sConfig)
logger, err := resolver.Logger()
if err != nil {
return err
}
client, err := resolver.PolicyReportClient()
if err != nil {
return err
}
secretInformer, err := resolver.SecretInformer()
if err != nil {
return err
}
g := &errgroup.Group{}
var store *database.Store
servOptions := []api.ServerOption{
api.WithPort(c.API.Port),
api.WithHealthChecks([]api.HealthCheck{
func() error {
if !client.HasSynced() {
return errors.New("informer not ready")
}
return nil
},
}),
}
if c.REST.Enabled {
db := resolver.Database()
if db == nil {
return errors.New("unable to create database connection")
}
defer db.Close()
store, err = resolver.Store(db)
if err != nil {
return err
}
nsClient, err := resolver.NamespaceClient()
if err != nil {
return err
}
if !c.LeaderElection.Enabled || store.IsSQLite() {
store.PrepareDatabase(cmd.Context())
resolver.RegisterStoreListener(cmd.Context(), store)
}
logger.Info("REST api enabled")
servOptions = append(servOptions, v1.WithAPI(store, resolver.TargetClients(), resolver.ViolationsReporter()), v2.WithAPI(store, nsClient, c.Targets))
}
if c.Metrics.Enabled {
logger.Info("metrics enabled")
resolver.RegisterMetricsListener()
servOptions = append(servOptions, api.WithMetrics())
}
if c.Profiling.Enabled {
logger.Info("pprof profiling enabled")
servOptions = append(servOptions, api.WithProfiling())
}
if !resolver.ResultCache().Shared() {
logger.Debug("register new result listener")
resolver.RegisterNewResultsListener()
}
if resolver.EnableLeaderElection() {
elector, err := resolver.LeaderElectionClient()
if err != nil {
return err
}
elector.RegisterOnStart(func(ctx context.Context) {
logger.Info("started leadership")
if c.REST.Enabled && !store.IsSQLite() {
store.PrepareDatabase(cmd.Context())
logger.Debug("register database persistence")
resolver.RegisterStoreListener(ctx, store)
if readinessProbe.Running() {
logger.Debug("trigger informer restart")
client.Stop()
}
}
resolver.RegisterSendResultListener()
readinessProbe.Ready()
}).RegisterOnNew(func(currentID, lockID string) {
if currentID != lockID {
logger.Info("leadership", zap.String("leader", currentID))
readinessProbe.Ready()
return
}
}).RegisterOnStop(func() {
logger.Info("stopped leadership")
if !store.IsSQLite() {
resolver.EventPublisher().UnregisterListener(listener.Store)
}
if resolver.HasTargets() {
resolver.UnregisterSendResultListener()
}
})
g.Go(func() error {
return elector.Run(cmd.Context())
})
} else {
resolver.RegisterSendResultListener()
readinessProbe.Ready()
}
server, err := resolver.Server(cmd.Context(), servOptions)
if err != nil {
return err
}
g.Go(server.Start)
g.Go(func() error {
logger.Info("wait policy informer")
readinessProbe.Wait()
logger.Info("start client", zap.Int("worker", c.WorkerCount))
for {
stop := make(chan struct{})
if err := client.Run(c.WorkerCount, stop); err != nil {
zap.L().Error("informer client error", zap.Error(err))
}
zap.L().Debug("informer restarts")
}
})
g.Go(func() error {
collection := resolver.TargetClients()
if !collection.UsesSecrets() {
return nil
}
readinessProbe.Wait()
stop := make(chan struct{})
if err := secretInformer.Sync(collection, stop); err != nil {
zap.L().Error("secret informer error", zap.Error(err))
return err
}
<-stop
return nil
})
return g.Wait()
},
}
// For local usage
cmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file")
cmd.PersistentFlags().StringP("config", "c", "", "target configuration file")
cmd.PersistentFlags().IntP("port", "p", 8001, "http port for the optional rest api")
cmd.PersistentFlags().StringP("dbfile", "d", "sqlite-database-v2.db", "path to the SQLite DB File")
cmd.PersistentFlags().BoolP("metrics-enabled", "m", false, "Enable Policy Reporter's Metrics API")
cmd.PersistentFlags().BoolP("rest-enabled", "r", false, "Enable Policy Reporter's REST API")
cmd.PersistentFlags().Bool("profile", false, "Enable application profiling with pprof")
cmd.PersistentFlags().String("lease-name", "policy-reporter", "name of the LeaseLock")
cmd.PersistentFlags().String("pod-name", "policy-reporter", "name of the pod, used for leaderelection")
cmd.PersistentFlags().StringP("template-dir", "t", "./templates", "template directory")
cmd.PersistentFlags().Int("worker", 5, "amount of queue worker")
cmd.PersistentFlags().Float32("qps", 20, "K8s RESTClient QPS")
cmd.PersistentFlags().Int("burst", 50, "K8s RESTClient burst")
flag.Parse()
return cmd
}