1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-01-20 18:52:16 +00:00

refactor: introduce cmd internal package (#5404)

* refactor: introduce cmd internal package

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* changelog

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* informer

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* tracing

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix flag

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-11-18 15:21:15 +01:00 committed by GitHub
parent 1b307ff6c5
commit 4bdd45c0cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 315 additions and 224 deletions

View file

@ -5,6 +5,7 @@
- Flag `autogenInternals` was removed, policy mutation has been removed. - Flag `autogenInternals` was removed, policy mutation has been removed.
- Flag `leaderElectionRetryPeriod` was added to control leader election renewal frequency (default value is `2s`). - Flag `leaderElectionRetryPeriod` was added to control leader election renewal frequency (default value is `2s`).
- Support upper case `Audit` and `Enforce` in `.spec.validationFailureAction` of the Kyverno policy, failure actions `audit` and `enforce` are deprecated and will be removed in `v1.11.0`. - Support upper case `Audit` and `Enforce` in `.spec.validationFailureAction` of the Kyverno policy, failure actions `audit` and `enforce` are deprecated and will be removed in `v1.11.0`.
- Flag `profileAddress` was added to configure address of profiling server (default value is `""`).
## v1.8.1-rc3 ## v1.8.1-rc3

View file

@ -1,37 +0,0 @@
package main
import (
"context"
"reflect"
)
// TODO: eventually move this in an util package
type startable interface {
Start(stopCh <-chan struct{})
}
type informer interface {
startable
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
func startInformers[T startable](ctx context.Context, informers ...T) {
for i := range informers {
informers[i].Start(ctx.Done())
}
}
func waitForCacheSync(ctx context.Context, informers ...informer) bool {
ret := true
for i := range informers {
for _, result := range informers[i].WaitForCacheSync(ctx.Done()) {
ret = ret && result
}
}
return ret
}
func startInformersAndWaitForCacheSync(ctx context.Context, informers ...informer) bool {
startInformers(ctx, informers...)
return waitForCacheSync(ctx, informers...)
}

View file

@ -3,15 +3,14 @@ package main
import ( import (
"context" "context"
"flag" "flag"
"fmt"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"strconv"
"syscall" "syscall"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/cmd/internal"
"github.com/kyverno/kyverno/pkg/clients/dclient" "github.com/kyverno/kyverno/pkg/clients/dclient"
kubeclientmetrics "github.com/kyverno/kyverno/pkg/clients/wrappers/metrics/kube" kubeclientmetrics "github.com/kyverno/kyverno/pkg/clients/wrappers/metrics/kube"
kubeclienttraces "github.com/kyverno/kyverno/pkg/clients/wrappers/traces/kube" kubeclienttraces "github.com/kyverno/kyverno/pkg/clients/wrappers/traces/kube"
@ -28,7 +27,6 @@ var (
kubeconfig string kubeconfig string
clientRateLimitQPS float64 clientRateLimitQPS float64
clientRateLimitBurst int clientRateLimitBurst int
logFormat string
otel string otel string
otelCollector string otelCollector string
metricsPort string metricsPort string
@ -37,13 +35,11 @@ var (
) )
const ( const (
resyncPeriod = 15 * time.Minute resyncPeriod = 15 * time.Minute
metadataResyncPeriod = 15 * time.Minute
) )
func parseFlags() error { func parseFlags(config internal.Configuration) {
logging.Init(nil) internal.InitFlags(config)
flag.StringVar(&logFormat, "loggingFormat", logging.TextFormat, "This determines the output format of the logger.")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.") flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 20, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.") flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 50, "Configure the maximum burst for throttle. Uses the client default if zero.")
@ -52,11 +48,7 @@ func parseFlags() error {
flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used") flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
flag.StringVar(&metricsPort, "metricsPort", "8000", "Expose prometheus metrics at the given port, default to 8000.") flag.StringVar(&metricsPort, "metricsPort", "8000", "Expose prometheus metrics at the given port, default to 8000.")
flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.") flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
if err := flag.Set("v", "2"); err != nil {
return err
}
flag.Parse() flag.Parse()
return nil
} }
func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, error) { func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, error) {
@ -81,7 +73,7 @@ func createInstrumentedClients(ctx context.Context, logger logr.Logger, clientCo
return nil, nil, err return nil, nil, err
} }
kubeClient = kubeclienttraces.Wrap(kubeClient) kubeClient = kubeclienttraces.Wrap(kubeClient)
dynamicClient, err := dclient.NewClient(ctx, clientConfig, kubeClient, metricsConfig, metadataResyncPeriod) dynamicClient, err := dclient.NewClient(ctx, clientConfig, kubeClient, metricsConfig, resyncPeriod)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -132,22 +124,19 @@ func setupSignals() (context.Context, context.CancelFunc) {
} }
func main() { func main() {
// config
appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
// parse flags // parse flags
if err := parseFlags(); err != nil { parseFlags(appConfig)
fmt.Println("failed to parse flags", err)
os.Exit(1)
}
// setup logger // setup logger
logLevel, err := strconv.Atoi(flag.Lookup("v").Value.String()) logger := internal.SetupLogger()
if err != nil { // setup maxprocs
fmt.Println("failed to setup logger", err) undo := internal.SetupMaxProcs(logger)
os.Exit(1) defer undo()
} // show version
if err := logging.Setup(logFormat, logLevel); err != nil { internal.ShowVersion(logger)
fmt.Println("failed to setup logger", err) // start profiling
os.Exit(1) internal.SetupProfiling(logger)
}
logger := logging.WithName("setup")
// create client config and kube clients // create client config and kube clients
clientConfig, rawClient, err := createKubeClients(logger) clientConfig, rawClient, err := createKubeClients(logger)
if err != nil { if err != nil {
@ -178,7 +167,7 @@ func main() {
secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister() secretLister := kubeKyvernoInformer.Core().V1().Secrets().Lister()
// start informers and wait for cache sync // start informers and wait for cache sync
// we need to call start again because we potentially registered new informers // we need to call start again because we potentially registered new informers
if !startInformersAndWaitForCacheSync(signalCtx, kubeKyvernoInformer) { if !internal.StartInformersAndWaitForCacheSync(signalCtx, kubeKyvernoInformer) {
os.Exit(1) os.Exit(1)
} }
server := NewServer( server := NewServer(

View file

@ -7,15 +7,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt"
"os" "os"
"os/signal" "os/signal"
"strconv"
"sync" "sync"
"syscall" "syscall"
"time" "time"
kyvernov1beta1 "github.com/kyverno/kyverno/api/kyverno/v1beta1" kyvernov1beta1 "github.com/kyverno/kyverno/api/kyverno/v1beta1"
"github.com/kyverno/kyverno/cmd/internal"
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned" kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
"github.com/kyverno/kyverno/pkg/clients/dclient" "github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/config" "github.com/kyverno/kyverno/pkg/config"
@ -33,10 +32,8 @@ import (
var ( var (
kubeconfig string kubeconfig string
setupLog = logging.WithName("setup")
clientRateLimitQPS float64 clientRateLimitQPS float64
clientRateLimitBurst int clientRateLimitBurst int
logFormat string
) )
const ( const (
@ -45,36 +42,26 @@ const (
convertGenerateRequest string = "ConvertGenerateRequest" convertGenerateRequest string = "ConvertGenerateRequest"
) )
func parseFlags() error { func parseFlags(config internal.Configuration) {
logging.Init(nil) internal.InitFlags(config)
flag.StringVar(&logFormat, "loggingFormat", logging.TextFormat, "This determines the output format of the logger.")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 0, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.") flag.Float64Var(&clientRateLimitQPS, "clientRateLimitQPS", 0, "Configure the maximum QPS to the Kubernetes API server from Kyverno. Uses the client default if zero.")
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.") flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
if err := flag.Set("v", "2"); err != nil {
return err
}
flag.Parse() flag.Parse()
return nil
} }
func main() { func main() {
// config
appConfig := internal.NewConfiguration()
// parse flags // parse flags
if err := parseFlags(); err != nil { parseFlags(appConfig)
fmt.Println("failed to parse flags", err)
os.Exit(1)
}
// setup logger // setup logger
logLevel, err := strconv.Atoi(flag.Lookup("v").Value.String()) logger := internal.SetupLogger()
if err != nil { // setup maxprocs
fmt.Println("failed to setup logger", err) undo := internal.SetupMaxProcs(logger)
os.Exit(1) defer undo()
} // show version
if err := logging.Setup(logFormat, logLevel); err != nil { internal.ShowVersion(logger)
fmt.Println("could not setup logger", err)
os.Exit(1)
}
// os signal handler // os signal handler
signalCtx, signalCancel := signal.NotifyContext(logging.Background(), os.Interrupt, syscall.SIGTERM) signalCtx, signalCancel := signal.NotifyContext(logging.Background(), os.Interrupt, syscall.SIGTERM)
defer signalCancel() defer signalCancel()
@ -84,13 +71,13 @@ func main() {
// create client config // create client config
clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst) clientConfig, err := config.CreateClientConfig(kubeconfig, clientRateLimitQPS, clientRateLimitBurst)
if err != nil { if err != nil {
setupLog.Error(err, "Failed to build kubeconfig") logger.Error(err, "Failed to build kubeconfig")
os.Exit(1) os.Exit(1)
} }
kubeClient, err := kubernetes.NewForConfig(clientConfig) kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil { if err != nil {
setupLog.Error(err, "Failed to create kubernetes client") logger.Error(err, "Failed to create kubernetes client")
os.Exit(1) os.Exit(1)
} }
@ -98,13 +85,13 @@ func main() {
// - client for all registered resources // - client for all registered resources
client, err := dclient.NewClient(signalCtx, clientConfig, kubeClient, nil, 15*time.Minute) client, err := dclient.NewClient(signalCtx, clientConfig, kubeClient, nil, 15*time.Minute)
if err != nil { if err != nil {
setupLog.Error(err, "Failed to create client") logger.Error(err, "Failed to create client")
os.Exit(1) os.Exit(1)
} }
pclient, err := kyvernoclient.NewForConfig(clientConfig) pclient, err := kyvernoclient.NewForConfig(clientConfig)
if err != nil { if err != nil {
setupLog.Error(err, "Failed to create client") logger.Error(err, "Failed to create client")
os.Exit(1) os.Exit(1)
} }
@ -185,7 +172,7 @@ func main() {
nil, nil,
) )
if err != nil { if err != nil {
setupLog.Error(err, "failed to elect a leader") logger.Error(err, "failed to elect a leader")
os.Exit(1) os.Exit(1)
} }

41
cmd/internal/config.go Normal file
View file

@ -0,0 +1,41 @@
package internal
type Configuration interface {
UsesTracing() bool
UsesProfiling() bool
}
func NewConfiguration(options ...ConfigurationOption) Configuration {
c := &configuration{}
for _, option := range options {
option(c)
}
return c
}
type ConfigurationOption func(c *configuration)
func WithTracing() ConfigurationOption {
return func(c *configuration) {
c.usesTracing = true
}
}
func WithProfiling() ConfigurationOption {
return func(c *configuration) {
c.usesProfiling = true
}
}
type configuration struct {
usesTracing bool
usesProfiling bool
}
func (c *configuration) UsesTracing() bool {
return c.usesTracing
}
func (c *configuration) UsesProfiling() bool {
return c.usesProfiling
}

22
cmd/internal/error.go Normal file
View file

@ -0,0 +1,22 @@
package internal
import (
"fmt"
"os"
"github.com/go-logr/logr"
)
func checkErr(err error, msg string) {
if err != nil {
fmt.Println(msg, err)
os.Exit(1)
}
}
func checkError(logger logr.Logger, err error, msg string, keysAndValues ...interface{}) {
if err != nil {
logger.Error(err, msg, keysAndValues...)
os.Exit(1)
}
}

53
cmd/internal/flag.go Normal file
View file

@ -0,0 +1,53 @@
package internal
import (
"flag"
"github.com/kyverno/kyverno/pkg/logging"
)
var (
// logging
loggingFormat string
// profiling
profilingEnabled bool
profilingAddress string
profilingPort string
// tracing
tracingEnabled bool
tracingAddress string
tracingPort string
tracingCreds string
)
func initLoggingFlags() {
logging.InitFlags(nil)
flag.StringVar(&loggingFormat, "loggingFormat", logging.TextFormat, "This determines the output format of the logger.")
checkErr(flag.Set("v", "2"), "failed to init flags")
}
func initProfilingFlags() {
flag.BoolVar(&profilingEnabled, "profile", false, "Set this flag to 'true', to enable profiling.")
flag.StringVar(&profilingPort, "profilePort", "6060", "Profiling server port, defaults to '6060'.")
flag.StringVar(&profilingAddress, "profileAddress", "", "Profiling server address, defaults to ''.")
}
func initTracingFlags() {
flag.BoolVar(&tracingEnabled, "enableTracing", false, "Set this flag to 'true', to enable tracing.")
flag.StringVar(&tracingPort, "tracingPort", "4317", "Tracing receiver port, defaults to '4317'.")
flag.StringVar(&tracingAddress, "tracingAddress", "", "Tracing receiver address, defaults to ''.")
flag.StringVar(&tracingCreds, "tracingCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Tracing Client. If empty string is set, means an insecure connection will be used")
}
func InitFlags(config Configuration) {
// logging
initLoggingFlags()
// profiling
if config.UsesProfiling() {
initProfilingFlags()
}
// tracing
if config.UsesTracing() {
initTracingFlags()
}
}

View file

@ -1,11 +1,10 @@
package main package internal
import ( import (
"context" "context"
"reflect" "reflect"
) )
// TODO: eventually move this in an util package
type startable interface { type startable interface {
Start(stopCh <-chan struct{}) Start(stopCh <-chan struct{})
} }
@ -15,13 +14,13 @@ type informer interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
} }
func startInformers[T startable](ctx context.Context, informers ...T) { func StartInformers[T startable](ctx context.Context, informers ...T) {
for i := range informers { for i := range informers {
informers[i].Start(ctx.Done()) informers[i].Start(ctx.Done())
} }
} }
func waitForCacheSync(ctx context.Context, informers ...informer) bool { func WaitForCacheSync(ctx context.Context, informers ...informer) bool {
ret := true ret := true
for i := range informers { for i := range informers {
for _, result := range informers[i].WaitForCacheSync(ctx.Done()) { for _, result := range informers[i].WaitForCacheSync(ctx.Done()) {
@ -31,7 +30,7 @@ func waitForCacheSync(ctx context.Context, informers ...informer) bool {
return ret return ret
} }
func checkCacheSync[T comparable](status map[T]bool) bool { func CheckCacheSync[T comparable](status map[T]bool) bool {
ret := true ret := true
for _, s := range status { for _, s := range status {
ret = ret && s ret = ret && s
@ -39,7 +38,7 @@ func checkCacheSync[T comparable](status map[T]bool) bool {
return ret return ret
} }
func startInformersAndWaitForCacheSync(ctx context.Context, informers ...informer) bool { func StartInformersAndWaitForCacheSync(ctx context.Context, informers ...informer) bool {
startInformers(ctx, informers...) StartInformers(ctx, informers...)
return waitForCacheSync(ctx, informers...) return WaitForCacheSync(ctx, informers...)
} }

16
cmd/internal/logging.go Normal file
View file

@ -0,0 +1,16 @@
package internal
import (
"flag"
"strconv"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/logging"
)
func SetupLogger() logr.Logger {
logLevel, err := strconv.Atoi(flag.Lookup("v").Value.String())
checkErr(err, "failed to setup logger")
checkErr(logging.Setup(loggingFormat, logLevel), "failed to setup logger")
return logging.WithName("setup")
}

21
cmd/internal/maxprocs.go Normal file
View file

@ -0,0 +1,21 @@
package internal
import (
"fmt"
"github.com/go-logr/logr"
"go.uber.org/automaxprocs/maxprocs"
)
func SetupMaxProcs(logger logr.Logger) func() {
logger = logger.WithName("maxprocs")
undo, err := maxprocs.Set(
maxprocs.Logger(
func(format string, args ...interface{}) {
logger.Info(fmt.Sprintf(format, args...))
},
),
)
checkError(logger, err, "failed to configure maxprocs")
return undo
}

16
cmd/internal/profiling.go Normal file
View file

@ -0,0 +1,16 @@
package internal
import (
"net"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/profiling"
)
func SetupProfiling(logger logr.Logger) {
logger = logger.WithName("profiling").WithValues("enabled", profilingEnabled, "address", profilingAddress, "port", profilingPort)
logger.Info("start profiling...")
if profilingEnabled {
profiling.Start(logger, net.JoinHostPort(profilingAddress, profilingPort))
}
}

27
cmd/internal/tracing.go Normal file
View file

@ -0,0 +1,27 @@
package internal
import (
"context"
"net"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/tracing"
"k8s.io/client-go/kubernetes"
)
func SetupTracing(logger logr.Logger, name string, kubeClient kubernetes.Interface) context.CancelFunc {
logger = logger.WithName("tracing").WithValues("enabled", tracingEnabled, "address", tracingAddress, "port", tracingPort, "creds", tracingCreds)
logger.Info("setup tracing...")
if tracingEnabled {
shutdown, err := tracing.NewTraceConfig(
logger,
name,
net.JoinHostPort(tracingAddress, tracingPort),
tracingCreds,
kubeClient,
)
checkError(logger, err, "failed to setup tracing")
return shutdown
}
return func() {}
}

11
cmd/internal/version.go Normal file
View file

@ -0,0 +1,11 @@
package internal
import (
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/version"
)
func ShowVersion(logger logr.Logger) {
logger = logger.WithName("version")
version.PrintVersionInfo(logger)
}

View file

@ -7,16 +7,15 @@ import (
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof" // #nosec
"os" "os"
"os/signal" "os/signal"
"strconv"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/cmd/internal"
"github.com/kyverno/kyverno/pkg/background" "github.com/kyverno/kyverno/pkg/background"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned" "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions" kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions"
@ -47,15 +46,12 @@ import (
"github.com/kyverno/kyverno/pkg/registryclient" "github.com/kyverno/kyverno/pkg/registryclient"
"github.com/kyverno/kyverno/pkg/tls" "github.com/kyverno/kyverno/pkg/tls"
"github.com/kyverno/kyverno/pkg/toggle" "github.com/kyverno/kyverno/pkg/toggle"
"github.com/kyverno/kyverno/pkg/tracing"
"github.com/kyverno/kyverno/pkg/utils" "github.com/kyverno/kyverno/pkg/utils"
runtimeutils "github.com/kyverno/kyverno/pkg/utils/runtime" runtimeutils "github.com/kyverno/kyverno/pkg/utils/runtime"
"github.com/kyverno/kyverno/pkg/version"
"github.com/kyverno/kyverno/pkg/webhooks" "github.com/kyverno/kyverno/pkg/webhooks"
webhookspolicy "github.com/kyverno/kyverno/pkg/webhooks/policy" webhookspolicy "github.com/kyverno/kyverno/pkg/webhooks/policy"
webhooksresource "github.com/kyverno/kyverno/pkg/webhooks/resource" webhooksresource "github.com/kyverno/kyverno/pkg/webhooks/resource"
webhookgenerate "github.com/kyverno/kyverno/pkg/webhooks/updaterequest" webhookgenerate "github.com/kyverno/kyverno/pkg/webhooks/updaterequest"
"go.uber.org/automaxprocs/maxprocs" // #nosec
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -74,14 +70,11 @@ var (
// will be removed in future and the configuration will be set only via configmaps // will be removed in future and the configuration will be set only via configmaps
kubeconfig string kubeconfig string
serverIP string serverIP string
profilePort string
metricsPort string metricsPort string
webhookTimeout int webhookTimeout int
genWorkers int genWorkers int
maxQueuedEvents int maxQueuedEvents int
profile bool
disableMetricsExport bool disableMetricsExport bool
enableTracing bool
otel string otel string
otelCollector string otelCollector string
transportCreds string transportCreds string
@ -96,26 +89,21 @@ var (
admissionReports bool admissionReports bool
reportsChunkSize int reportsChunkSize int
backgroundScanWorkers int backgroundScanWorkers int
logFormat string
dumpPayload bool dumpPayload bool
leaderElectionRetryPeriod time.Duration leaderElectionRetryPeriod time.Duration
// DEPRECATED: remove in 1.9 // DEPRECATED: remove in 1.9
splitPolicyReport bool splitPolicyReport bool
) )
func parseFlags() error { func parseFlags(config internal.Configuration) {
logging.Init(nil) internal.InitFlags(config)
flag.StringVar(&logFormat, "loggingFormat", logging.TextFormat, "This determines the output format of the logger.")
flag.BoolVar(&dumpPayload, "dumpPayload", false, "Set this flag to activate/deactivate debug mode.") flag.BoolVar(&dumpPayload, "dumpPayload", false, "Set this flag to activate/deactivate debug mode.")
flag.IntVar(&webhookTimeout, "webhookTimeout", webhookcontroller.DefaultWebhookTimeout, "Timeout for webhook configurations.") flag.IntVar(&webhookTimeout, "webhookTimeout", webhookcontroller.DefaultWebhookTimeout, "Timeout for webhook configurations.")
flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.") flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.")
flag.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.") flag.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.") flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.BoolVar(&profile, "profile", false, "Set this flag to 'true', to enable profiling.")
flag.StringVar(&profilePort, "profilePort", "6060", "Enable profiling at given port, defaults to 6060.")
flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.") flag.BoolVar(&disableMetricsExport, "disableMetrics", false, "Set this flag to 'true' to disable metrics.")
flag.BoolVar(&enableTracing, "enableTracing", false, "Set this flag to 'true', to enable exposing traces.")
flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"") flag.StringVar(&otel, "otelConfig", "prometheus", "Set this flag to 'grpc', to enable exporting metrics to an Opentelemetry Collector. The default collector is set to \"prometheus\"")
flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.") flag.StringVar(&otelCollector, "otelCollector", "opentelemetrycollector.kyverno.svc.cluster.local", "Set this flag to the OpenTelemetry Collector Service Address. Kyverno will try to connect to this on the metrics port.")
flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used") flag.StringVar(&transportCreds, "transportCreds", "", "Set this flag to the CA secret containing the certificate which is used by our Opentelemetry Metrics Client. If empty string is set, means an insecure connection will be used")
@ -136,43 +124,7 @@ func parseFlags() error {
flag.DurationVar(&leaderElectionRetryPeriod, "leaderElectionRetryPeriod", leaderelection.DefaultRetryPeriod, "Configure leader election retry period.") flag.DurationVar(&leaderElectionRetryPeriod, "leaderElectionRetryPeriod", leaderelection.DefaultRetryPeriod, "Configure leader election retry period.")
// DEPRECATED: remove in 1.9 // DEPRECATED: remove in 1.9
flag.BoolVar(&splitPolicyReport, "splitPolicyReport", false, "This is deprecated, please don't use it, will be removed in v1.9.") flag.BoolVar(&splitPolicyReport, "splitPolicyReport", false, "This is deprecated, please don't use it, will be removed in v1.9.")
if err := flag.Set("v", "2"); err != nil {
return err
}
flag.Parse() flag.Parse()
return nil
}
func setupMaxProcs(logger logr.Logger) (func(), error) {
logger = logger.WithName("maxprocs")
if undo, err := maxprocs.Set(maxprocs.Logger(func(format string, args ...interface{}) {
logger.Info(fmt.Sprintf(format, args...))
})); err != nil {
return nil, err
} else {
return undo, nil
}
}
func startProfiling(logger logr.Logger) {
logger = logger.WithName("profiling")
logger.Info("start profiling...", "profile", profile, "port", profilePort)
if profile {
addr := ":" + profilePort
logger.Info("Enable profiling, see details at https://github.com/kyverno/kyverno/wiki/Profiling-Kyverno-on-Kubernetes", "port", profilePort)
go func() {
s := http.Server{
Addr: addr,
Handler: nil,
ErrorLog: logging.StdLogger(logger, ""),
ReadHeaderTimeout: 30 * time.Second,
}
if err := s.ListenAndServe(); err != nil {
logger.Error(err, "failed to enable profiling", "address", addr)
os.Exit(1)
}
}()
}
} }
func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, metadataclient.Interface, error) { func createKubeClients(logger logr.Logger) (*rest.Config, kubernetes.Interface, metadataclient.Interface, error) {
@ -265,24 +217,6 @@ func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics
return metricsConfig, cancel, nil return metricsConfig, cancel, nil
} }
func setupTracing(logger logr.Logger, kubeClient kubernetes.Interface) (context.CancelFunc, error) {
logger = logger.WithName("tracing")
logger.Info("setup tracing...", "enabled", enableTracing, "port", otelCollector, "creds", transportCreds)
var cancel context.CancelFunc
if enableTracing {
tracerProvider, err := tracing.NewTraceConfig(otelCollector, transportCreds, kubeClient, logging.WithName("tracing"))
if err != nil {
return nil, err
}
cancel = func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
defer tracing.ShutDownController(ctx, tracerProvider)
}
}
return cancel, nil
}
func setupRegistryClient(logger logr.Logger, kubeClient kubernetes.Interface) error { func setupRegistryClient(logger logr.Logger, kubeClient kubernetes.Interface) error {
logger = logger.WithName("registry-client") logger = logger.WithName("registry-client")
logger.Info("setup registry client...", "secrets", imagePullSecrets, "insecure", allowInsecureRegistry) logger.Info("setup registry client...", "secrets", imagePullSecrets, "insecure", allowInsecureRegistry)
@ -326,11 +260,6 @@ func showWarnings(logger logr.Logger) {
} }
} }
func showVersion(logger logr.Logger) {
logger = logger.WithName("version")
version.PrintVersionInfo(logger)
}
func sanityChecks(dynamicClient dclient.Interface) error { func sanityChecks(dynamicClient dclient.Interface) error {
if !utils.CRDsInstalled(dynamicClient.Discovery()) { if !utils.CRDsInstalled(dynamicClient.Discovery()) {
return fmt.Errorf("CRDs not installed") return fmt.Errorf("CRDs not installed")
@ -523,35 +452,21 @@ func createrLeaderControllers(
} }
func main() { func main() {
// config
appConfig := internal.NewConfiguration(internal.WithProfiling(), internal.WithTracing())
// parse flags // parse flags
if err := parseFlags(); err != nil { parseFlags(appConfig)
fmt.Println("failed to parse flags", err)
os.Exit(1)
}
// setup logger // setup logger
logLevel, err := strconv.Atoi(flag.Lookup("v").Value.String()) logger := internal.SetupLogger()
if err != nil {
fmt.Println("failed to setup logger", err)
os.Exit(1)
}
if err := logging.Setup(logFormat, logLevel); err != nil {
fmt.Println("failed to setup logger", err)
os.Exit(1)
}
logger := logging.WithName("setup")
// setup maxprocs // setup maxprocs
if undo, err := setupMaxProcs(logger); err != nil { undo := internal.SetupMaxProcs(logger)
logger.Error(err, "failed to configure maxprocs") defer undo()
os.Exit(1)
} else {
defer undo()
}
// show version // show version
showWarnings(logger) showWarnings(logger)
// show version // show version
showVersion(logger) internal.ShowVersion(logger)
// start profiling // start profiling
startProfiling(logger) internal.SetupProfiling(logger)
// create client config and kube clients // create client config and kube clients
clientConfig, rawClient, metadataClient, err := createKubeClients(logger) clientConfig, rawClient, metadataClient, err := createKubeClients(logger)
if err != nil { if err != nil {
@ -577,12 +492,8 @@ func main() {
os.Exit(1) os.Exit(1)
} }
// setup tracing // setup tracing
if tracingShutdown, err := setupTracing(logger, kubeClient); err != nil { tracingShutdown := internal.SetupTracing(logger, "kyverno", kubeClient)
logger.Error(err, "failed to setup tracing") defer tracingShutdown()
os.Exit(1)
} else if tracingShutdown != nil {
defer tracingShutdown()
}
// setup registry client // setup registry client
if err := setupRegistryClient(logger, kubeClient); err != nil { if err := setupRegistryClient(logger, kubeClient); err != nil {
logger.Error(err, "failed to setup registry client") logger.Error(err, "failed to setup registry client")
@ -652,7 +563,7 @@ func main() {
openApiManager, openApiManager,
) )
// start informers and wait for cache sync // start informers and wait for cache sync
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) { if !internal.StartInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1) os.Exit(1)
} }
@ -705,12 +616,12 @@ func main() {
os.Exit(1) os.Exit(1)
} }
// start informers and wait for cache sync // start informers and wait for cache sync
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) { if !internal.StartInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1) os.Exit(1)
} }
startInformers(signalCtx, metadataInformer) internal.StartInformers(signalCtx, metadataInformer)
if !checkCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) { if !internal.CheckCacheSync(metadataInformer.WaitForCacheSync(signalCtx.Done())) {
// TODO: shall we just exit ? // TODO: shall we just exit ?
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
} }
@ -789,7 +700,7 @@ func main() {
) )
// start informers and wait for cache sync // start informers and wait for cache sync
// we need to call start again because we potentially registered new informers // we need to call start again because we potentially registered new informers
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) { if !internal.StartInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1) os.Exit(1)
} }

View file

@ -26,6 +26,8 @@ const (
TextFormat = "text" TextFormat = "text"
// LogLevelController is the log level to use for controllers plumbing. // LogLevelController is the log level to use for controllers plumbing.
LogLevelController = 3 LogLevelController = 3
// LogLevelClient is the log level to use for clients.
LogLevelClient = 3
) )
// Initially, globalLog comes from controller-runtime/log with logger created earlier by controller-runtime. // Initially, globalLog comes from controller-runtime/log with logger created earlier by controller-runtime.
@ -34,7 +36,7 @@ const (
// All loggers created after logging.Setup won't be subject to the call depth limitation and will work if the underlying sink supports it. // All loggers created after logging.Setup won't be subject to the call depth limitation and will work if the underlying sink supports it.
var globalLog = log.Log var globalLog = log.Log
func Init(flags *flag.FlagSet) { func InitFlags(flags *flag.FlagSet) {
// clear flags initialized in static dependencies // clear flags initialized in static dependencies
if flag.CommandLine.Lookup("log_dir") != nil { if flag.CommandLine.Lookup("log_dir") != nil {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
@ -77,6 +79,11 @@ func ControllerLogger(name string) logr.Logger {
return globalLog.WithName(name).V(LogLevelController) return globalLog.WithName(name).V(LogLevelController)
} }
// ClientLogger returns a logr.Logger to be used by clients.
func ClientLogger(name string) logr.Logger {
return globalLog.WithName(name).V(LogLevelClient)
}
// WithName returns a new logr.Logger instance with the specified name element added to the Logger's name. // WithName returns a new logr.Logger instance with the specified name element added to the Logger's name.
func WithName(name string) logr.Logger { func WithName(name string) logr.Logger {
return GlobalLogger().WithName(name) return GlobalLogger().WithName(name)

26
pkg/profiling/pprof.go Normal file
View file

@ -0,0 +1,26 @@
package profiling
import (
"net/http"
_ "net/http/pprof" // #nosec
"os"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/logging"
)
func Start(logger logr.Logger, address string) {
logger.Info("Enable profiling, see details at https://github.com/kyverno/kyverno/wiki/Profiling-Kyverno-on-Kubernetes")
go func() {
s := http.Server{
Addr: address,
ErrorLog: logging.StdLogger(logger, ""),
ReadHeaderTimeout: 30 * time.Second,
}
if err := s.ListenAndServe(); err != nil {
logger.Error(err, "failed to enable profiling")
os.Exit(1)
}
}()
}

View file

@ -2,6 +2,7 @@ package tracing
import ( import (
"context" "context"
"time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/utils/kube" "github.com/kyverno/kyverno/pkg/utils/kube"
@ -17,15 +18,8 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func ShutDownController(ctx context.Context, tp *sdktrace.TracerProvider) { // NewTraceConfig generates the initial tracing configuration with 'address' as the endpoint to connect to the Opentelemetry Collector
// pushes any last exports to the receiver func NewTraceConfig(log logr.Logger, name, address, certs string, kubeClient kubernetes.Interface) (func(), error) {
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}
// NewTraceConfig generates the initial tracing configuration with 'endpoint' as the endpoint to connect to the Opentelemetry Collector
func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interface, log logr.Logger) (*sdktrace.TracerProvider, error) {
ctx := context.Background() ctx := context.Background()
var client otlptrace.Client var client otlptrace.Client
@ -38,12 +32,12 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
} }
client = otlptracegrpc.NewClient( client = otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint(endpoint), otlptracegrpc.WithEndpoint(address),
otlptracegrpc.WithTLSCredentials(transportCreds), otlptracegrpc.WithTLSCredentials(transportCreds),
) )
} else { } else {
client = otlptracegrpc.NewClient( client = otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint(endpoint), otlptracegrpc.WithEndpoint(address),
otlptracegrpc.WithInsecure(), otlptracegrpc.WithInsecure(),
) )
} }
@ -56,7 +50,7 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
} }
res, err := resource.New(context.Background(), res, err := resource.New(context.Background(),
resource.WithAttributes(semconv.ServiceNameKey.String("kyverno_traces")), resource.WithAttributes(semconv.ServiceNameKey.String(name)),
resource.WithSchemaURL(semconv.SchemaURL), resource.WithSchemaURL(semconv.SchemaURL),
) )
if err != nil { if err != nil {
@ -75,7 +69,14 @@ func NewTraceConfig(endpoint string, certs string, kubeClient kubernetes.Interfa
// set global propagator to tracecontext (the default is no-op). // set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{}) otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTracerProvider(tp) otel.SetTracerProvider(tp)
return tp, nil return func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// pushes any last exports to the receiver
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}, nil
} }
// DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. // DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.